Skip to content

Commit 9c62834

Browse files
authored
Adding stream request flag to SearchRequestContext (opensearch-project#20530)
* Update streaming flag to use search request context Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com> * Changelog Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com> --------- Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>
1 parent 6b557db commit 9c62834

File tree

4 files changed

+18
-0
lines changed

4 files changed

+18
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1616
### Changed
1717
- Move Randomness from server to libs/common ([#20570](https://github.com/opensearch-project/OpenSearch/pull/20570))
1818
- Use env variable (OPENSEARCH_FIPS_MODE) to enable opensearch to run in FIPS enforced mode instead of checking for existence of bcFIPS jars ([#20625](https://github.com/opensearch-project/OpenSearch/pull/20625))
19+
- Update streaming flag to use search request context ([#20530](https://github.com/opensearch-project/OpenSearch/pull/20530))
1920

2021
### Fixed
2122
- Fix flaky test failures in ShardsLimitAllocationDeciderIT ([#20375](https://github.com/opensearch-project/OpenSearch/pull/20375))

server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,10 @@ public final SearchRequest getRequest() {
739739
return request;
740740
}
741741

742+
protected SearchRequestContext getSearchRequestContext() {
743+
return searchRequestContext;
744+
}
745+
742746
protected final SearchResponse buildSearchResponse(
743747
InternalSearchResponse internalSearchResponse,
744748
ShardSearchFailure[] failures,

server/src/main/java/org/opensearch/action/search/SearchRequestContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class SearchRequestContext {
4343
private final SearchRequest searchRequest;
4444
private final LinkedBlockingQueue<TaskResourceInfo> phaseResourceUsage;
4545
private final Supplier<TaskResourceInfo> taskResourceUsageSupplier;
46+
private boolean streamingRequest;
4647

4748
SearchRequestContext(
4849
final SearchRequestOperationsListener searchRequestOperationsListener,
@@ -156,6 +157,14 @@ void setSuccessfulSearchShardIndices(Set<Index> successfulSearchShardIndices) {
156157
public Set<Index> getSuccessfulSearchShardIndices() {
157158
return successfulSearchShardIndices;
158159
}
160+
161+
void setStreamingRequest(boolean streamingRequest) {
162+
this.streamingRequest = streamingRequest;
163+
}
164+
165+
public boolean isStreamingRequest() {
166+
return streamingRequest;
167+
}
159168
}
160169

161170
enum ShardStatsFieldNames {

server/src/main/java/org/opensearch/action/search/StreamSearchQueryThenFetchAsyncAction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ void successfulShardExecution(SearchShardIterator shardsIt) {
154154
try {
155155
shardResultsConsumed.set(true);
156156
if (streamResultsReceived.get() == streamResultsConsumeCallback.get()) {
157+
if (streamResultsReceived.get() > 0) {
158+
getSearchRequestContext().setStreamingRequest(true);
159+
}
157160
getLogger().debug("Stream results consumption has called back, let shard consumption callback trigger onPhaseDone");
158161
onPhaseDone();
159162
} else {
@@ -180,6 +183,7 @@ private void successfulStreamExecution() {
180183
try {
181184
if (streamResultsReceived.get() == streamResultsConsumeCallback.incrementAndGet()) {
182185
if (shardResultsConsumed.get()) {
186+
getSearchRequestContext().setStreamingRequest(true);
183187
getLogger().debug("Stream consumption trigger onPhaseDone");
184188
onPhaseDone();
185189
}

0 commit comments

Comments
 (0)