Skip to content

Add streaming search with configurable scoring modes#19176

Open
atris wants to merge 70 commits intoopensearch-project:mainfrom
atris:streaming-scoring-clean
Open

Add streaming search with configurable scoring modes#19176
atris wants to merge 70 commits intoopensearch-project:mainfrom
atris:streaming-scoring-clean

Conversation

@atris
Copy link
Contributor

@atris atris commented Aug 28, 2025

Summary

Implement streaming search on the coordinator, emitting early partial results from the query phase with optional scoring. The change introduces request flags and a mode selector, integrates streaming into the existing SearchAction
path, and adds a reproducible TTFB benchmark. When streaming is not used, behavior is unchanged.

Motivation

• Reduce time-to-first-byte (TTFB) at the coordinator by not waiting for all shards to complete query phase before starting fetch-eligible work.
• Provide mode-specific controls for batching and scoring, with safe defaults.
• Keep backward compatibility on the transport wire and preserve REST semantics.

Design and Scope

• Request flags and mode
• SearchRequest gains version-gated fields (V_3_3_0): streamingScoring (boolean) and streamingSearchMode (string).
• REST: stream=true enables streaming; optional stream_scoring_mode and streaming_mode select behavior.
• No change to default behavior; streaming is opt‑in.
• Coordinator streaming
• Streaming is integrated into TransportSearchAction (SearchAction). No separate transport action is required.
• SearchPhaseController.newSearchPhaseResults(...) returns either the existing QueryPhaseResultConsumer or a StreamQueryPhaseResultConsumer based on the request mode.
• StreamQueryPhaseResultConsumer controls partial reduce cadence via mode-specific multipliers and emits TopDocs-aware partials to the progress listener.
• Partial reduce notifications
• SearchProgressListener gains a TopDocs-aware hook with a compatibility fallback:
• onPartialReduceWithTopDocs(…) → defaults to onPartialReduce(…).
• notifyPartialReduceWithTopDocs(…) invokes the hook safely.
• Existing listeners are unaffected.
• Query execution
• For streaming queries, the QueryPhase routes to streaming collector contexts based on StreamingSearchMode:
• NO_SCORING: unsorted documents, fastest emission.
• SCORED_UNSORTED: scored documents without sort.
• SCORED_SORTED: scored, sorted via Lucene’s top-N collectors.
• CONFIDENCE_BASED: early emission guided by simple Hoeffding-style bounds.
• Collector batch size is bounded and read via SearchContext.getStreamingBatchSize(); partial batches are emitted to the stream channel when available.
• Transport integration
• Both the classic and stream transport handlers are registered:
• Classic: SearchTransportService.registerRequestHandler(…).
• Stream (if available): StreamSearchTransportService.registerStreamRequestHandler(…).
• The streaming transport path is selected only for streaming requests and used thread pools are chosen accordingly.

Settings and Controls

• Dynamic cluster settings for streaming are added (StreamingSearchSettings, node-scoped, dynamic). Examples:
• search.streaming.batch_size
• Mode-specific reduce multipliers, emission interval, and minimal doc thresholds
• Circuit breaker and limits for buffering in streaming code paths
• Defaults are conservative. The feature remains opt-in via request flags; settings do not change behavior unless the request is streaming.

Wire Compatibility and API

• Transport wire BWC
• New SearchRequest and ShardSearchRequest fields are gated by Version.V_3_3_0 on read/write. Older peers neither write nor read these fields.
• Public API
• No breaking changes to REST endpoints.
• SearchProgressListener adds new methods with safe defaults; existing code continues to compile and run.

Tests and Benchmark

• Unit tests:
• Stream consumer batch sizing and dynamic settings effects.
• Hoeffding bounds behavior.
• Integration tests:
• Basic streaming search workflows.
• Streaming aggregations with and without sub-aggregations.
• Mode coverage (NO_SCORING, SCORED_UNSORTED, SCORED_SORTED, CONFIDENCE_BASED).
• Benchmark:
• StreamingPerformanceBenchmarkTests: measures coordinator-side TTFB (time to first partial reduce) vs. classic full reduce for a large query.
• Logger-only reporting; no REST streaming is introduced.

Non-Goals / Limitations

• This change does not implement HTTP/REST streaming of partial responses.
• The SearchResponse partial/sequence metadata used internally by the streaming listener is not serialized on the wire and does not alter REST payloads.
• Confidence-based mode uses a conservative and simple bound; it is adequate for early gating but not a full ranking stability analysis.

Backward Compatibility and Risk

• Default behavior unchanged unless streaming flags are provided.
• Wire BWC ensured via version gating; JApiCmp passes.
• Aggregation partial reductions are unaffected; for TopDocs partials we call the new TopDocs-aware hook, otherwise we continue to notify via the existing method.

Operational Notes

• Streaming is disabled by default and must be explicitly requested with stream=true (REST) or by setting SearchRequest flags programmatically.
• Mode selection allows tuning for latency vs. coordination cost.
• Dynamic settings enable safe runtime tuning if necessary.

If reviewers prefer, I can split the settings and the confidence-based collector into a follow-up to further reduce the initial surface.

Summary by CodeRabbit

  • New Features

    • Streaming search extended to delete-by-query flows and safer REST channel lifecycle management for streaming requests.
  • Bug Fixes

    • More reliable aggregation profiling type resolution when profiling wrappers are present.
    • Safer handling around post-collection aggregation building to avoid null/chain issues.
  • Tests

    • New unit and integration tests for streaming search, channel lifecycle/untracking, streaming aggregators, and streaming collector selection.

✏️ Tip: You can customize this high-level summary in your review settings.

@github-actions
Copy link
Contributor

❌ Gradle check result for 5554606: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@atris atris closed this Aug 29, 2025
@atris atris reopened this Aug 29, 2025
@github-actions
Copy link
Contributor

❌ Gradle check result for 5554606: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@atris atris closed this Aug 29, 2025
@atris atris reopened this Aug 29, 2025
@github-actions
Copy link
Contributor

❌ Gradle check result for 5554606: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

  Introduces streaming search infrastructure that enables progressive emission
  of search results with three configurable scoring modes. The implementation
  extends the existing streaming transport layer to support partial result
  computation at the coordinator level.

  Scoring modes:
  - NO_SCORING: Immediate result emission without confidence requirements
  - CONFIDENCE_BASED: Statistical emission using Hoeffding inequality bounds
  - FULL_SCORING: Complete scoring before result emission

  The implementation leverages OpenSearch's inter-node streaming capabilities
  to reduce query latency through early result emission. Partial reductions
  are triggered based on the selected scoring mode, with results accumulated
  at the coordinator before final response generation.

  Key changes:
  - Add HoeffdingBounds for statistical confidence calculation
  - Extend QueryPhaseResultConsumer to support streaming reduction
  - Add StreamingScoringCollector wrapping TopScoreDocCollector
  - Integrate streaming scorer selection in QueryPhase
  - Add REST parameter stream_scoring_mode for mode selection
  - Include streaming metadata in SearchResponse

  The current implementation operates within architectural constraints where
  streaming is limited to inter-node communication. Client-facing streaming
  will be addressed in a follow-up contribution.

  Addresses opensearch-project#18725

Signed-off-by: Atri Sharma <atri.jiit@gmail.com>
@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 737d8a4.

'Diff too large, requires skip by maintainers after manual review'


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

}
}
final DiscoveryNodes nodes = clusterState.nodes();
final boolean isStreamingCandidate = (searchRequest.isStreamingScoring() || searchRequest.getStreamingSearchMode() != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see if you can move this logic to RestSearchAction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.TransportSearchAction still owns transport connection selection
(SearchTransportService vs StreamSearchTransportService) since its shard state dependent

logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if you can make use and port all this logic to StreamTransportService

@github-actions
Copy link
Contributor

github-actions bot commented Mar 6, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 0f5c5c2.

'Diff too large, requires skip by maintainers after manual review'


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 8517a7c.

'Diff too large, requires skip by maintainers after manual review'


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 9e4f84f.

'Diff too large, requires skip by maintainers after manual review'


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 0f5c5c2.

'Diff too large, requires skip by maintainers after manual review'


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

Signed-off-by: Atri Sharma <atrisharma@Atris-Mac-Studio.local>
@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 6fc677b.

'Diff too large, requires skip by maintainers after manual review'


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

Signed-off-by: Atri Sharma <atrisharma@Atris-Mac-Studio.local>
@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit b7f3128.

'Diff too large, requires skip by maintainers after manual review'


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit e3fe3ae.

'Diff too large, requires skip by maintainers after manual review'


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

Signed-off-by: Atri Sharma <atrisharma@Atris-Mac-Studio.local>
@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 2e09c6b.

'Diff too large, requires skip by maintainers after manual review'


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

Atri Sharma added 2 commits March 18, 2026 13:54
Signed-off-by: Atri Sharma <atrisharma@Atris-Mac-Studio.local>
@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 0092c49.

PathLineSeverityDescription
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java162lowINFO-level logging (not trace/debug) added for every batch sent, including task.response() toString which could serialise partial result metadata to production logs at high volume. Not exfiltration, but atypically verbose for a hot path and could expose internal state in log aggregation systems.
server/src/main/java/org/opensearch/action/ActionModule.java741lowThe feature-flag guard (FeatureFlags.STREAM_TRANSPORT) protecting StreamSearchAction registration was removed. The streaming search action is now always registered regardless of whether the experimental feature flag is enabled, potentially enabling unvetted experimental code paths in production clusters without explicit opt-in.
server/src/main/java/org/opensearch/search/SearchService.java841lowThe call to getExecutor(executorName, shard) is replaced with hardcoded executor selection that ignores the executorName parameter passed by callers. This bypasses any custom executor routing logic, which could subvert thread-pool admission controls or throttling policies intended for specific search paths.

The table above displays the top 10 most important findings.

Total: 3 | Critical: 0 | High: 0 | Medium: 0 | Low: 3


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Contributor

github-actions bot commented Mar 18, 2026

PR Reviewer Guide 🔍

(Review updated until commit 0998491)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Add streaming mode fields and flush mode management to SearchContext

Relevant files:

  • server/src/main/java/org/opensearch/search/internal/SearchContext.java
  • server/src/main/java/org/opensearch/search/DefaultSearchContext.java
  • server/src/test/java/org/opensearch/search/DefaultSearchContextStreamingTests.java

Sub-PR theme: Update and fix streaming aggregation tests

Relevant files:

  • plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java
  • server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.java
  • server/src/test/java/org/opensearch/action/search/StreamQueryPhaseResultConsumerTests.java

Sub-PR theme: FlightOutboundHandler logging and comment cleanup

Relevant files:

  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java

⚡ Recommended focus areas for review

Debug Logging

Production code contains logger.info(...) statements logging internal details of every batch sent (requestId, action, response, row count) and QuerySearchResult aggregation state. These should be at DEBUG or TRACE level, or removed entirely before merging, as they will flood logs in production.

if (task.response() instanceof org.opensearch.search.query.QuerySearchResult) {
    logger.info(
        "QuerySearchResult hasAggs: {}",
        ((org.opensearch.search.query.QuerySearchResult) task.response()).hasAggs()
    );
}
logger.info(
    "Sending batch for requestId [{}], action [{}], items [{}], rows [{}]",
    task.requestId(),
    task.action(),
    task.response(),
    out.getRoot().getRowCount()
);
SetOnce Misuse

setFlushMode calls cachedFlushMode.set(flushMode) on a SetOnce field. SetOnce.set() throws AlreadySetException if the value has already been set, making this method unsafe to call more than once. This could cause unexpected exceptions if the flush mode is updated after being initially set. Consider using an AtomicReference instead if overwriting is intended.

public void setFlushMode(FlushMode flushMode) {
    cachedFlushMode.set(flushMode);
}
Weak Test Coverage

The original comprehensive test (testStreamingAggregationFromMultipleShards) that validated multi-shard streaming, partial reduce notifications, and final aggregation correctness has been removed and replaced with a single trivial test (testStreamingConsumerBatchSizes) that only checks a hard-coded batch size. The createMockResult helper is defined but never used. This significantly reduces confidence in the streaming consumer's correctness.

public void testStreamingConsumerBatchSizes() {
    SearchRequest request = new SearchRequest();
    request.setStreamingSearchMode(StreamingSearchMode.NO_SCORING.toString());

    StreamQueryPhaseResultConsumer consumer = new StreamQueryPhaseResultConsumer(
        request,
        threadPool.executor(ThreadPool.Names.SEARCH),
        circuitBreaker,
        searchPhaseController,
        SearchProgressListener.NOOP,
        namedWriteableRegistry,
        10,
        exc -> {}
    );

    int batchSize = consumer.getBatchReduceSize(100, 10);
    assertEquals(1, batchSize);
}

private SearchPhaseResult createMockResult(QuerySearchResult qResult, SearchShardTarget target, int index) {
    return new SearchPhaseResult() {
        @Override
        public QuerySearchResult queryResult() {
            return qResult;
        }

        @Override
        public SearchShardTarget getSearchShardTarget() {
            return target;
        }

        @Override
        public void setSearchShardTarget(SearchShardTarget shardTarget) {}

        @Override
        public int getShardIndex() {
            return index;
        }

        @Override
        public void setShardIndex(int shardIndex) {}
    };
}
Empty catch-rethrow

Multiple test methods contain try { resp = future.actionGet(...); } catch (Exception e) { throw e; } blocks that add no value — they simply rethrow the caught exception. These should be removed and the actionGet call made directly, or the catch block should add meaningful handling/logging.

try {
    resp = future.actionGet(TimeValue.timeValueSeconds(35));
} catch (Exception e) {
    throw e;
}
Behavior Change

The testBuildAggregationsBatchReset test assertion was changed from equalTo(1L) to equalTo(2L) with a comment saying "we now preserve state in doReset for streaming, so counts should accumulate." This is a semantic change in the aggregator's reset behavior. Accumulating state across resets may cause incorrect results in scenarios where a true reset is expected (e.g., between different queries or contexts). This behavioral change needs careful validation.

// We now preserve state in doReset for streaming, so counts should accumulate
assertThat(secondResult.getBuckets().get(0).getDocCount(), equalTo(2L));

@github-actions
Copy link
Contributor

github-actions bot commented Mar 18, 2026

PR Code Suggestions ✨

Latest suggestions up to 0998491
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Settings scope change breaks runtime configurability

These settings were changed from NodeScope to IndexScope, but ClusterSettings.java
simultaneously removes them from the built-in cluster settings list. Index-scoped
settings must be registered in IndexScopedSettings, not ClusterSettings. If they are
not registered anywhere, any attempt to set them will throw an
IllegalArgumentException about unknown settings, making the feature completely
unconfigurable at runtime.

server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java [72-108]

+// Keep as NodeScope + Dynamic so they remain in ClusterSettings (or register
+// them explicitly in IndexScopedSettings if index-level granularity is required).
 public static final Setting<Long> STREAMING_MAX_ESTIMATED_BUCKET_COUNT = Setting.longSetting(
-    "index.aggregation.streaming.max_estimated_bucket_count",
+    "search.aggregations.streaming.max_estimated_bucket_count",
     100_000L,
     1L,
-    Setting.Property.Dynamic,
-    Setting.Property.IndexScope
+    Setting.Property.NodeScope,
+    Setting.Property.Dynamic
 );
-...
+
 public static final Setting<Double> STREAMING_MIN_CARDINALITY_RATIO = Setting.doubleSetting(
-    "index.aggregation.streaming.min_cardinality_ratio",
+    "search.aggregations.streaming.min_cardinality_ratio",
     0.01,
     0.0,
     1.0,
-    Setting.Property.Dynamic,
-    Setting.Property.IndexScope
+    Setting.Property.NodeScope,
+    Setting.Property.Dynamic
 );
-...
+
 public static final Setting<Long> STREAMING_MIN_ESTIMATED_BUCKET_COUNT = Setting.longSetting(
-    "index.aggregation.streaming.min_estimated_bucket_count",
+    "search.aggregations.streaming.min_estimated_bucket_count",
     1000L,
     1L,
-    Setting.Property.Dynamic,
-    Setting.Property.IndexScope
+    Setting.Property.NodeScope,
+    Setting.Property.Dynamic
 );
Suggestion importance[1-10]: 8

__

Why: The PR changes these settings from NodeScope to IndexScope while simultaneously removing them from ClusterSettings, but they are not registered in IndexScopedSettings either. This would make the settings completely unregistered and throw IllegalArgumentException at runtime when attempting to use them, which is a critical correctness issue.

Medium
Fix race condition in streaming response completion

There is a race condition: two threads can both pass the completed.get() check
before either sets completed to true, leading to channel.sendResponseBatch() being
called concurrently and channel.completeStream() potentially being called twice. The
check-then-act should use compareAndSet or the method should be synchronized.

server/src/main/java/org/opensearch/action/support/StreamSearchChannelListener.java [53-66]

 public void onStreamResponse(Response response, boolean isLastBatch) {
     assert response != null;
-    if (completed.get()) {
-        // Ignore late responses after completion to avoid double-completion and task tracker mismatches
-        return;
-    }
-    channel.sendResponseBatch(response);
     if (isLastBatch) {
-        try {
-            channel.completeStream();
-        } finally {
-            completed.set(true);
+        if (!completed.compareAndSet(false, true)) {
+            return; // Already completed
         }
+        channel.sendResponseBatch(response);
+        channel.completeStream();
+    } else {
+        if (completed.get()) {
+            return;
+        }
+        channel.sendResponseBatch(response);
     }
 }
Suggestion importance[1-10]: 8

__

Why: There is a genuine TOCTOU race condition where two threads can both pass the completed.get() check before either sets completed to true, potentially causing sendResponseBatch and completeStream to be called concurrently. Using compareAndSet for the final batch is a correct fix for this concurrency issue.

Medium
Empty stream leaves coordinator shard slot permanently open

When the stream is empty (lastResult == null) and the listener is a
StreamSearchActionListener, neither onStreamResponse nor onFailure is called,
leaving the shard slot permanently open and the search hanging. The empty-stream
case should call onFailure (or a no-op complete response) for both listener types to
ensure the coordinator can make progress.

server/src/main/java/org/opensearch/action/search/StreamSearchTransportService.java [171-192]

 while ((currentResult = response.nextResponse()) != null) {
     if (streamingListener) {
         if (lastResult != null) {
             ((StreamSearchActionListener) listener).onStreamResponse(lastResult, false);
         }
         lastResult = currentResult;
     } else {
-        // Non-streaming: keep only the last (final) response
         lastResult = currentResult;
     }
 }
 
 if (lastResult != null) {
     if (streamingListener) {
         ((StreamSearchActionListener) listener).onStreamResponse(lastResult, true);
         logger.debug("Processed final stream response");
     } else {
         listener.onResponse(lastResult);
     }
 } else {
     logger.debug("Empty stream");
+    listener.onFailure(new IllegalStateException("Empty stream response from shard"));
 }
Suggestion importance[1-10]: 7

__

Why: When the stream is empty and the listener is a StreamSearchActionListener, neither onStreamResponse nor onFailure is called, which can leave the coordinator hanging indefinitely. The suggestion correctly identifies this gap and proposes calling onFailure to unblock the coordinator.

Medium
Fix index reader opened while writer still active

The test testSubAggregationPersistence uses a RandomIndexWriter but then opens the
reader inside the inner try-with-resources block, meaning the IndexWriter is still
open when the reader is used. This can cause issues with segment visibility. The
reader should be opened after the writer is closed or flushed, similar to other
tests in this file that use IndexWriter directly with an explicit commit().

server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.java [1771-1786]

-// Reset for next batch
-aggregator.reset();
+try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) {
+    Document document1 = new Document();
+    document1.add(new NumericDocValuesField("category", 1));
+    document1.add(new NumericDocValuesField("price", 100));
+    indexWriter.addDocument(document1);
+    indexWriter.commit();
+}
+try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(directory))) {
+    // ... rest of test
+}
 
-// Execute Batch 2 (Empty)
-// If reset cleared state, Max would be lost (or invalid)
-aggregator.preCollection();
-// Search nothing
-indexSearcher.search(new org.apache.lucene.search.MatchNoDocsQuery(), aggregator);
-aggregator.postCollection();
-
-// Verify Final Result
-LongTerms finalResult = (LongTerms) aggregator.buildAggregations(new long[] { 0 })[0];
-assertThat(finalResult.getBuckets().size(), equalTo(1)); // Bucket for category 1 should still exist
-assertThat(finalResult.getBuckets().get(0).getDocCount(), equalTo(1L)); // Count should be 1
-
Suggestion importance[1-10]: 6

__

Why: The test uses RandomIndexWriter and opens the reader via indexWriter.getReader() inside the inner try block while the writer is still open. This can cause segment visibility issues. The suggested fix of committing and closing the writer before opening the reader is a more reliable pattern, consistent with other tests in the file that use IndexWriter.commit().

Low
Ensure streaming fields are explicitly reset for older wire versions

The partial and docIds fields are declared at the bottom of the class but are
read/written in the middle of readFromWithId/writeToNoId. Since Java initializes
fields in declaration order, partial defaults to false and docIds to null before
readFromWithId runs, which is fine. However, the fields must be declared before the
methods that use them to avoid any potential forward-reference issues in subclasses
or serialization frameworks. More critically, the version gate V_3_3_0 used here
must match exactly what is written in writeToNoId; if these two version checks ever
diverge, deserialization will silently corrupt the stream by reading wrong bytes.

server/src/main/java/org/opensearch/search/query/QuerySearchResult.java [383-390]

 if (in.getVersion().onOrAfter(Version.V_3_3_0)) {
     partial = in.readBoolean();
-    if (in.readBoolean()) {
+    boolean hasDocIds = in.readBoolean();
+    if (hasDocIds) {
         docIds = in.readList(StreamInput::readInt);
     } else {
         docIds = null;
     }
+} else {
+    partial = false;
+    docIds = null;
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that partial and docIds should be explicitly reset to their defaults when reading from older wire versions to avoid stale state. The improved_code adds an else branch that is a valid defensive improvement, though in practice Java field initialization already sets these defaults before readFromWithId is called.

Low
Preserve sub-aggregation reset in streaming batch mode

Silently suppressing reset() entirely means sub-aggregations will never be reset
between streaming batches, which can cause incorrect cumulative results for
sub-aggregations that are not designed to accumulate state. If the intent is only to
preserve the HyperLogLog registers, consider selectively resetting sub-aggregations
while skipping the HLL counts reset.

server/src/main/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregator.java [100-106]

 @Override
 public void reset() {
-    // No-op to preserve state across streaming batches.
-    // We purposefully do NOT call super.reset() because that would:
-    // 1. Call doReset() (clearing bucket/doc counts)
-    // 2. Call collectableSubAggregators.reset() (clearing sub-aggregation state)
+    // Skip doReset() to preserve HLL registers across streaming batches.
+    // Still reset sub-aggregations to avoid stale state accumulation.
+    collectableSubAggregators.reset();
 }
Suggestion importance[1-10]: 6

__

Why: The no-op reset() silently skips resetting sub-aggregations, which could lead to incorrect cumulative results for sub-aggregations not designed to accumulate state. The suggestion to selectively reset sub-aggregations while preserving HLL registers is a valid concern, though the impact depends on whether sub-aggregations are actually used with cardinality aggregations in streaming mode.

Low
Fix conflated streaming mode detection methods

isStreamingSearch() and isStreamingModeRequested() have identical implementations in
the base class, both delegating to getStreamingMode() != null. This is misleading
since DefaultSearchContext overrides isStreamingModeRequested() to use a separate
streamingModeRequested field, but the base class default conflates the two concepts.
The base class isStreamingModeRequested() should not silently alias
isStreamingSearch() as this can cause subtle bugs in subclasses that don't override
both.

server/src/main/java/org/opensearch/search/internal/SearchContext.java [637-643]

 public boolean isStreamingSearch() {
     return getStreamingMode() != null;
 }
 
 public boolean isStreamingModeRequested() {
-    return getStreamingMode() != null;
+    // Subclasses should override this to track whether streaming was explicitly requested
+    return false;
 }
Suggestion importance[1-10]: 5

__

Why: The base class isStreamingSearch() and isStreamingModeRequested() have identical implementations, which is semantically misleading. While DefaultSearchContext overrides isStreamingModeRequested() with a separate field, the base class default conflating the two concepts could cause subtle bugs in other subclasses that only override one of them.

Low
Align streaming eligibility checks across aggregator paths

The streaming path in numericSupplier() checks context.isStreamSearch() &&
context.getFlushMode() == FlushMode.PER_SEGMENT, but the GLOBAL_ORDINALS path in
bytesSupplier() checks context.isStreamingModeRequested() with a different
condition. These inconsistent guard conditions mean numeric streaming may not
activate when isStreamingModeRequested() is true but isStreamSearch() is false (or
vice versa), leading to silent fallback to non-streaming aggregation without any
indication to the caller.

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java [213-255]

-if (context.isStreamSearch() && context.getFlushMode() == FlushMode.PER_SEGMENT) {
-    if ((includeExclude != null) && (includeExclude.isRegexBased()) && format != DocValueFormat.RAW) {
-        ...
-    }
+if ((context.isStreamSearch() || context.isStreamingModeRequested()) && context.getFlushMode() == FlushMode.PER_SEGMENT) {
     ...
     return createStreamNumericTermsAggregator(...);
 }
 
-if ((includeExclude != null) && (includeExclude.isRegexBased()) && format != DocValueFormat.RAW) {
-    throw new AggregationExecutionException(...);
-}
-
Suggestion importance[1-10]: 5

__

Why: The numeric supplier uses context.isStreamSearch() while the bytes/string supplier uses context.isStreamingModeRequested() as the streaming guard condition. This inconsistency could cause numeric streaming to not activate in cases where string streaming would, leading to silent non-streaming fallback. However, the improved_code uses || which may be too permissive depending on the intended semantics.

Low
Fail fast when collector is not a BucketCollector

If AggregatorTreeEvaluator.evaluateAndRecreateIfNeeded returns a newly created
collector that is not a BucketCollector, preCollection() will be silently skipped,
leaving aggregators in an uninitialized state. The recreated collector from
MultiBucketCollector.wrap should always be a BucketCollector, so a hard failure (or
at minimum a warning) is preferable to a silent skip.

server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java [76-81]

 Collector collector = MultiBucketCollector.wrap(aggProvider.apply(searchContext));
 collector = AggregatorTreeEvaluator.evaluateAndRecreateIfNeeded(collector, searchContext, aggProvider);
 if (collector instanceof BucketCollector) {
     ((BucketCollector) collector).preCollection();
+} else {
+    throw new IllegalStateException("Expected BucketCollector after evaluateAndRecreateIfNeeded, got: " + collector.getClass());
 }
 return collector;
Suggestion importance[1-10]: 5

__

Why: The silent skip of preCollection() when the collector is not a BucketCollector could leave aggregators uninitialized. However, since MultiBucketCollector.wrap and evaluateAndRecreateIfNeeded should always return a BucketCollector, this is more of a defensive programming concern than a likely bug.

Low
General
Avoid degenerate single-item batch reduces

Returning 1 as the batch reduce size means every single shard result triggers a
reduce operation, which can cause excessive CPU usage and contention for large
fan-out queries. The minimum should be bounded by minBatchReduceSize to respect the
caller's lower bound contract, or at least use a sensible minimum (e.g., 2) to avoid
degenerate single-item reduces.

server/src/main/java/org/opensearch/action/search/StreamQueryPhaseResultConsumer.java [50-53]

 @Override
 int getBatchReduceSize(int requestBatchedReduceSize, int minBatchReduceSize) {
-    // Reduce immediately for fastest TTFB
-    return Math.min(requestBatchedReduceSize, 1);
+    // Reduce as soon as possible for fastest TTFB, but respect the minimum batch size
+    return Math.max(minBatchReduceSize, Math.min(requestBatchedReduceSize, 2));
 }
Suggestion importance[1-10]: 5

__

Why: Returning 1 as the batch reduce size ignores minBatchReduceSize entirely, which could cause excessive reduce operations for large fan-out queries. The suggestion to respect the minimum batch size is reasonable, though the tradeoff between TTFB and CPU usage is a design decision.

Low
Remove unused dead code method

The createMockResult method is defined but never called anywhere in the test class.
This dead code adds confusion and should be removed, or the test
testStreamingConsumerBatchSizes should be expanded to actually use it to verify
meaningful behavior.

server/src/test/java/org/opensearch/action/search/StreamQueryPhaseResultConsumerTests.java [68-91]

-private SearchPhaseResult createMockResult(QuerySearchResult qResult, SearchShardTarget target, int index) {
-    return new SearchPhaseResult() {
-        @Override
-        public QuerySearchResult queryResult() {
-            return qResult;
-        }
+// Remove the unused createMockResult method entirely, or use it in a test.
 
-        @Override
-        public SearchShardTarget getSearchShardTarget() {
-            return target;
-        }
-
-        @Override
-        public void setSearchShardTarget(SearchShardTarget shardTarget) {}
-
-        @Override
-        public int getShardIndex() {
-            return index;
-        }
-
-        @Override
-        public void setShardIndex(int shardIndex) {}
-    };
-}
-
Suggestion importance[1-10]: 3

__

Why: The createMockResult method is defined but never called in the test class, making it dead code. However, this is a minor code quality issue that doesn't affect correctness or functionality.

Low
Document intentional skip of parent reset to prevent confusion

doReset() no longer calls super.doReset() or releases bucketOrds, which means memory
held by bucketOrds is never freed between batches. The parent
TermsAggregator.doReset() likely releases sub-aggregator state and other resources;
skipping it entirely risks memory leaks and incorrect sub-aggregation state. The
comment is also incomplete (cut off mid-sentence), indicating unfinished reasoning.

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java [91-97]

 @Override
 public void doReset() {
-    // super.doReset(); // Prevent clearing doc counts which explains why we verify
-    // buckets but 0 doc counts
-    // DO NOT close/null bucketOrds - preserve cumulative bucket state for final
-    // reduction
-    // This keeps all bucket mappings intact across batches so final
+    // Intentionally skip super.doReset() to preserve cumulative bucket state
+    // (bucketOrds and doc counts) across streaming batches. Sub-aggregator reset
+    // is handled by the overridden reset() method which is a no-op for streaming.
+    // bucketOrds is released in doClose().
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion only improves comments/documentation around the intentional skip of super.doReset(). While the incomplete comment is a real issue, this is primarily a documentation improvement with no functional code change.

Low

Previous suggestions

Suggestions up to commit 0092c49
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent exception when overwriting already-set flush mode

SetOnce.set() throws an AlreadySetException if the value has already been set. The
setFlushMode method calls cachedFlushMode.set(flushMode) unconditionally, which will
throw an exception if setFlushModeIfAbsent was already called successfully. This
inconsistency can cause unexpected runtime failures. Consider using a different
field (e.g., a plain AtomicReference) if overwriting is needed, or document that
setFlushMode must only be called when the value is not yet set.

server/src/main/java/org/opensearch/search/DefaultSearchContext.java [1400-1417]

 @Override
 public FlushMode getFlushMode() {
     FlushMode currentMode = cachedFlushMode.get();
     if (currentMode != null) {
         return currentMode;
     }
     // Non-mutating fallback: advertise PER_SEGMENT only if streaming was requested
     return streamingModeRequested ? FlushMode.PER_SEGMENT : null;
 }
 
 @Override
 public boolean setFlushModeIfAbsent(FlushMode flushMode) {
     return cachedFlushMode.trySet(flushMode);
 }
 
 @Override
 public void setFlushMode(FlushMode flushMode) {
-    cachedFlushMode.set(flushMode);
+    // trySet returns false if already set; ignore to avoid AlreadySetException
+    cachedFlushMode.trySet(flushMode);
 }
Suggestion importance[1-10]: 7

__

Why: SetOnce.set() throws AlreadySetException if already set, making setFlushMode potentially throw at runtime if called after setFlushModeIfAbsent. Using trySet instead would prevent this inconsistency.

Medium
Fix incorrect setting scope causing runtime errors

Settings with IndexScope are index-level settings and cannot be retrieved via
clusterService.getClusterSettings(). The FlushModeResolver.decideFlushMode and
resolve methods appear to be called with values from cluster settings. Changing
these to NodeScope (or ClusterScope) is necessary for them to be accessible as
cluster-level settings, otherwise reading them will throw an exception at runtime.

server/src/main/java/org/opensearch/search/streaming/FlushModeResolver.java [72-78]

 public static final Setting<Long> STREAMING_MAX_ESTIMATED_BUCKET_COUNT = Setting.longSetting(
-    "index.aggregation.streaming.max_estimated_bucket_count",
+    "search.aggregations.streaming.max_estimated_bucket_count",
     100_000L,
     1L,
-    Setting.Property.Dynamic,
-    Setting.Property.IndexScope
+    Setting.Property.NodeScope,
+    Setting.Property.Dynamic
 );
Suggestion importance[1-10]: 7

__

Why: Settings declared with IndexScope cannot be retrieved via clusterService.getClusterSettings(), which would cause a runtime exception. The PR changed the setting key and scope from NodeScope to IndexScope, which is a functional regression.

Medium
Emit remaining documents in final batch

The emitCurrentBatch method clears currentBatch only when isFinal is false, but
emitCurrentBatch is only ever called with isFinal=false inside collect(). The final
batch (remaining documents after the loop) is never emitted because
emitCurrentBatch(true) is never called, causing data loss for the last partial batch
of documents.

server/src/main/java/org/opensearch/search/query/StreamingUnsortedCollectorContext.java [173-196]

 private void emitCurrentBatch(boolean isFinal) {
     if (currentBatch.isEmpty()) return;
 
     try {
-        ...
-        if (!isFinal) {
-            currentBatch.clear();
+        QuerySearchResult partial = new QuerySearchResult();
+        TopDocs topDocs = new TopDocs(
+            new TotalHits(currentBatch.size(), TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO),
+            currentBatch.toArray(new ScoreDoc[0])
+        );
+        partial.topDocs(new org.opensearch.common.lucene.search.TopDocsAndMaxScore(topDocs, Float.NaN), null);
+        partial.setPartial(!isFinal);
+
+        if (searchContext != null && searchContext.getStreamChannelListener() != null) {
+            searchContext.getStreamChannelListener().onStreamResponse(partial, isFinal);
         }
+
+        currentBatch.clear();
     } catch (Exception e) {
         logger.trace("Failed to emit streaming batch", e);
     }
 }
Suggestion importance[1-10]: 7

__

Why: The emitCurrentBatch is only called with isFinal=false inside collect(), meaning the last partial batch of documents (when currentBatch.size() < batchSize) is never emitted, causing data loss. The postProcess method handles the final TopDocs but the streaming channel never receives the last batch.

Medium
Fix race condition in streaming response completion

The completed flag is checked before sending the batch but only set after
completeStream(). There is a race condition where two threads can both pass the
completed.get() check simultaneously, leading to duplicate batch sends and
potentially duplicate completeStream() calls. Use compareAndSet to atomically guard
the final batch path, and consider using a lock or CAS for the non-final batch path
as well.

server/src/main/java/org/opensearch/action/support/StreamSearchChannelListener.java [53-66]

 public void onStreamResponse(Response response, boolean isLastBatch) {
     assert response != null;
     if (completed.get()) {
-        // Ignore late responses after completion to avoid double-completion and task tracker mismatches
         return;
     }
-    channel.sendResponseBatch(response);
     if (isLastBatch) {
-        try {
-            channel.completeStream();
-        } finally {
-            completed.set(true);
+        if (!completed.compareAndSet(false, true)) {
+            return;
         }
+        channel.sendResponseBatch(response);
+        channel.completeStream();
+    } else {
+        channel.sendResponseBatch(response);
     }
 }
Suggestion importance[1-10]: 7

__

Why: The completed flag check and set are not atomic for the non-final batch path, creating a potential race condition where two threads could both pass the completed.get() check and send duplicate batches. Using compareAndSet for the final batch path would be more robust.

Medium
Pass task cancellation supplier to streaming consumer

The StreamQueryPhaseResultConsumer path does not receive the isTaskCancelled
supplier, which means task cancellation checks are silently dropped for streaming
searches. This could cause streaming queries to continue executing even after the
task has been cancelled, wasting resources and potentially causing incorrect
behavior.

server/src/main/java/org/opensearch/action/search/SearchPhaseController.java [814-838]

 if (streamingMode != null) {
     return new StreamQueryPhaseResultConsumer(
         request,
         executor,
         circuitBreaker,
         this,
         listener,
         namedWriteableRegistry,
         numShards,
-        onPartialMergeFailure
+        onPartialMergeFailure,
+        isTaskCancelled
     );
 } else {
-    // Regular QueryPhaseResultConsumer
     return new QueryPhaseResultConsumer(
         request,
         executor,
         circuitBreaker,
         this,
         listener,
         namedWriteableRegistry,
         numShards,
         onPartialMergeFailure,
         isTaskCancelled
     );
 }
Suggestion importance[1-10]: 7

__

Why: The StreamQueryPhaseResultConsumer constructor does not receive the isTaskCancelled supplier, which means streaming queries won't respect task cancellation. However, the constructor signature of StreamQueryPhaseResultConsumer may not accept this parameter, so the fix depends on whether the class supports it.

Medium
General
Reduce hot-path logging level to avoid log flooding

Using logger.info for per-batch logging in a hot path (every streaming batch sent)
will produce excessive log output in production and degrade performance. This should
be logger.debug or logger.trace to avoid flooding logs under normal operation.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [171-177]

-logger.info(
-    "Sending batch for requestId [{}], action [{}], items [{}], rows [{}]",
-    task.requestId(),
-    task.action(),
-    task.response(),
-    out.getRoot().getRowCount()
-);
+if (logger.isDebugEnabled()) {
+    logger.debug(
+        "Sending batch for requestId [{}], action [{}], items [{}], rows [{}]",
+        task.requestId(),
+        task.action(),
+        task.response(),
+        out.getRoot().getRowCount()
+    );
+}
Suggestion importance[1-10]: 6

__

Why: Using logger.info for every streaming batch sent is a legitimate performance and operational concern that would flood logs in production. Changing to logger.debug with a guard is a straightforward and impactful improvement.

Low
Fix potential negative active stream counter on fallback

recordFallback decrements currentActiveStreams, but startSearch is the only method
that increments it. If recordFallback is called without a prior startSearch call (or
after recordSuccess/recordFailure already decremented it), the counter will go
negative, producing incorrect metrics. The reason parameter is also silently
ignored. Consider either removing the decrement from recordFallback or documenting
that it must be called instead of (not in addition to) recordSuccess/recordFailure.

server/src/main/java/org/opensearch/search/streaming/StreamingSearchMetrics.java [155-158]

 public void recordFallback(String reason) {
     fallbackToNormalSearches.inc();
+    // Only decrement if this is called as the terminal event (instead of recordSuccess/recordFailure)
+    // Callers must ensure currentActiveStreams is not decremented twice for the same search
     currentActiveStreams.decrementAndGet();
+    if (logger.isDebugEnabled()) {
+        logger.debug("Streaming search fallback recorded, reason: {}", reason);
+    }
 }
Suggestion importance[1-10]: 5

__

Why: The recordFallback method decrements currentActiveStreams which could go negative if called incorrectly, but the improved_code only adds a comment and debug logging without actually fixing the underlying issue. The suggestion identifies a real concern but the fix is incomplete.

Low
Rename test to match its actual verified behavior

The test method is named testThrowOnManySegments but the new code removes the
assertion that an exception is thrown and instead asserts the opposite behavior —
that no exception is thrown and the aggregator works correctly. This is a
contradictory test name that will mislead future developers. The test should be
renamed to reflect its actual intent, or the original exception-throwing behavior
should be preserved if it is still expected.

server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.java [1357-1371]

-// Verify factory correctly falls back to classic aggregator for multi-segment
-// readers
-// Verify factory creates StreamStringTermsAggregator even for multi-segment
-// readers
-// as we now support it (or at least don't forbid it)
+// Renamed from testThrowOnManySegments to testHandleMultipleSegments
+// Verify factory creates StreamStringTermsAggregator even for multi-segment readers
 assertThat(
     "Factory should create StreamStringTermsAggregator",
     aggregator,
     instanceOf(StreamStringTermsAggregator.class)
 );
 
 // Verify the aggregator works correctly (no exception thrown)
 aggregator.preCollection();
 searcher.search(new MatchAllDocsQuery(), aggregator);
 aggregator.postCollection();
Suggestion importance[1-10]: 5

__

Why: The test testThrowOnManySegments now asserts the opposite behavior (no exception thrown), making the name misleading. Renaming it would improve maintainability and prevent developer confusion.

Low
Distinguish streaming search active state from mode requested

Both isStreamingSearch() and isStreamingModeRequested() have identical
implementations that delegate to getStreamingMode() != null. In
DefaultSearchContext, isStreamingModeRequested() is overridden to use a separate
streamingModeRequested field, but isStreamingSearch() is not overridden and will
always return the same result as isStreamingModeRequested(). If these two methods
are intended to have different semantics (e.g., one checks if mode was requested in
the request, the other checks if streaming is currently active), the base class
defaults should reflect that distinction to avoid subtle bugs in subclasses that
don't override both.

server/src/main/java/org/opensearch/search/internal/SearchContext.java [637-643]

 public boolean isStreamingSearch() {
-    return getStreamingMode() != null;
+    return isStreamSearch();
 }
 
 public boolean isStreamingModeRequested() {
     return getStreamingMode() != null;
 }
Suggestion importance[1-10]: 4

__

Why: Both isStreamingSearch() and isStreamingModeRequested() have identical implementations in the base class, which may cause subtle bugs if subclasses don't override both. However, the improved_code delegates to isStreamSearch() which has different semantics, and this change could introduce inconsistencies in subclasses.

Low
Deduplicate streaming eligibility logic into helper method

The streaming eligibility check (isStreamingCandidate && streamingEnabledEffective)
is duplicated verbatim in both executeSearch and searchAsyncAction, with slightly
different variable names (useStreamingTransportForConnection vs
useStreamingTransport). This duplication risks the two code paths diverging. The
logic should be extracted into a single helper method to ensure consistency.

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java [1150-1156]

-final boolean isStreamingCandidate = (searchRequest.getStreamingSearchMode() != null)
-    && searchRequest.scroll() == null;
-final boolean streamingEnabledSetting = clusterService.getClusterSettings().get(StreamSearchTransportService.STREAM_SEARCH_ENABLED);
-final boolean streamingEnabledEffective = streamingEnabledSetting || FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT);
-final boolean useStreamingTransportForConnection = isStreamingCandidate
-    && streamSearchTransportService != null
-    && streamingEnabledEffective;
+private boolean shouldUseStreamingTransport(SearchRequest searchRequest) {
+    final boolean isStreamingCandidate = (searchRequest.getStreamingSearchMode() != null)
+        && searchRequest.scroll() == null;
+    final boolean streamingEnabledSetting = clusterService.getClusterSettings().get(StreamSearchTransportService.STREAM_SEARCH_ENABLED);
+    final boolean streamingEnabledEffective = streamingEnabledSetting || FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT);
+    return isStreamingCandidate && streamSearchTransportService != null && streamingEnabledEffective;
+}
Suggestion importance[1-10]: 4

__

Why: The duplication of streaming eligibility logic between executeSearch and searchAsyncAction is a valid maintainability concern, but the suggestion's improved_code shows a new helper method that doesn't replace the existing code snippet, making it an additive refactoring suggestion rather than a direct fix.

Low
Document intentional memory retention to avoid confusion

The doReset() method no longer calls super.doReset() or closes bucketOrds, which
means memory held by bucketOrds from previous batches is never released. This is a
memory leak. The comment is also incomplete. If the intent is to preserve state
across batches, this should be clearly documented and the parent's non-destructive
reset logic (if any) should still be invoked.

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java [91-97]

 @Override
 public void doReset() {
-    // super.doReset(); // Prevent clearing doc counts which explains why we verify
-    // buckets but 0 doc counts
-    // DO NOT close/null bucketOrds - preserve cumulative bucket state for final
-    // reduction
-    // This keeps all bucket mappings intact across batches so final
+    // Intentionally do NOT call super.doReset() or close bucketOrds.
+    // Streaming batches accumulate bucket state across segments; bucketOrds is
+    // released only in doClose(). Sub-aggregation state is preserved via reset() no-op.
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion is about improving documentation clarity for an intentional design decision. While the incomplete comment could cause confusion, this is a minor style/documentation issue with no functional impact.

Low
Verify symmetric serialization ordering for new fields

The writeToNoId method writes partial and docIds after
getRescoreDocIds().writeTo(out), but readFromWithId reads them after
setRescoreDocIds(new RescoreDocIds(in)). This ordering is consistent. However,
writeToNoId is also called from contexts where the ID is not written first, while
readFromWithId always reads the ID. Verify that the read/write ordering of the new
fields is symmetric across all serialization paths to avoid deserialization
corruption.

server/src/main/java/org/opensearch/search/query/QuerySearchResult.java [383-390]

+// No code change needed if ordering is verified symmetric; add an explicit comment
+// confirming the write order in writeToNoId matches the read order in readFromWithId
 if (in.getVersion().onOrAfter(Version.V_3_3_0)) {
     partial = in.readBoolean();
     if (in.readBoolean()) {
         docIds = in.readList(StreamInput::readInt);
     } else {
         docIds = null;
     }
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion asks to verify ordering symmetry between readFromWithId and writeToNoId, but the improved_code is essentially the same as existing_code with just a comment added. This is a verification suggestion with no actual code change, making it low impact.

Low

@github-actions
Copy link
Contributor

❌ Gradle check result for 0092c49: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Atri Sharma <atrisharma@Atris-Mac-Studio.local>
@github-actions
Copy link
Contributor

Persistent review updated to latest commit 0998491

@github-actions
Copy link
Contributor

❌ Gradle check result for 0998491: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Atri Sharma added 3 commits March 19, 2026 20:59
Signed-off-by: Atri Sharma <atrisharma@Atris-Mac-Studio.local>
Signed-off-by: Atri Sharma <atrisharma@Atris-Mac-Studio.local>
@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 560c347.

PathLineSeverityDescription
server/src/main/java/org/opensearch/action/ActionModule.java744mediumFeature flag gate removed: StreamSearchAction is now unconditionally registered without the FeatureFlags.STREAM_TRANSPORT guard. This exposes experimental streaming search functionality to all clusters regardless of whether the feature flag is enabled, potentially bypassing rollout controls for untested code paths.
server/src/main/java/org/opensearch/rest/action/RestCancellableNodeClient.java103lowNew finishTracking() method removes an HTTP channel from the tracked channels map and clears its associated task set, preventing task cancellation when the channel closes. While intended for streaming completion, this pattern could allow long-running tasks to avoid being cancelled if called prematurely.
server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java266lowNew 'streaming_mode' and 'stream_scoring_mode' request parameters are accepted directly from user input and used to set internal streaming behavior without validation beyond enum parsing. While not directly exploitable, these undocumented parameters alter search execution paths and could be used to trigger experimental code paths on production clusters.
server/src/main/java/org/opensearch/search/SearchService.java1006lowsetStreamChannelListener(null) is called with a comment noting no actual listener is set for classic transport, but the context is still marked as streaming. This removes an assertion boundary (previously DefaultSearchContext.setStreamChannelListener asserted isStreamSearch()) and could cause null listener access in streaming code paths reached via classic transport.

The table above displays the top 10 most important findings.

Total: 4 | Critical: 0 | High: 0 | Medium: 1 | Low: 3


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

2 participants