Skip to content

Commit 62768a0

Browse files
Atri SharmaAtri Sharma
authored andcommitted
And more stuff
Signed-off-by: Atri Sharma <atrisharma@Atris-Mac-Studio.local>
1 parent 560c347 commit 62768a0

File tree

13 files changed

+41
-251
lines changed

13 files changed

+41
-251
lines changed

server/src/main/java/org/opensearch/action/search/SearchPhaseController.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -246,14 +246,7 @@ static TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
246246
}
247247

248248
static void setShardIndex(TopDocs topDocs, int shardIndex) {
249-
// Idempotent assignment: in streaming flows partial reductions may touch the same TopDocs more than once.
250-
if (topDocs.scoreDocs.length == 0) {
251-
return;
252-
}
253-
if (topDocs.scoreDocs[0].shardIndex != -1) {
254-
// Already set by a previous pass; avoid reassigning to prevent assertion failures
255-
return;
256-
}
249+
assert topDocs.scoreDocs.length == 0 || topDocs.scoreDocs[0].shardIndex == -1 : "shardIndex is already set";
257250
for (ScoreDoc doc : topDocs.scoreDocs) {
258251
doc.shardIndex = shardIndex;
259252
}

server/src/main/java/org/opensearch/action/search/StreamQueryPhaseResultConsumer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public StreamQueryPhaseResultConsumer(
4848

4949
@Override
5050
int getBatchReduceSize(int requestBatchedReduceSize, int minBatchReduceSize) {
51-
// Reduce immediately for fastest TTFB
5251
return Math.min(requestBatchedReduceSize, 1);
5352
}
5453
}

server/src/main/java/org/opensearch/action/search/StreamSearchQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
*/
2929
public class StreamSearchQueryThenFetchAsyncAction extends SearchQueryThenFetchAsyncAction {
3030

31-
private final Logger logger;
32-
3331
StreamSearchQueryThenFetchAsyncAction(
3432
Logger logger,
3533
SearchTransportService searchTransportService,
@@ -70,7 +68,6 @@ public class StreamSearchQueryThenFetchAsyncAction extends SearchQueryThenFetchA
7068
searchRequestContext,
7169
tracer
7270
);
73-
this.logger = logger;
7471
}
7572

7673
@Override
@@ -87,10 +84,7 @@ SearchActionListener<SearchPhaseResult> createShardActionListener(
8784
@Override
8885
protected void innerOnStreamResponse(SearchPhaseResult result) {
8986
try {
90-
if (getLogger().isTraceEnabled()) {
91-
getLogger().trace("coordinator received partial from shard {}", shard);
92-
}
93-
onStreamResult(result, shardIt, () -> successfulStreamExecution());
87+
onStreamResult(result, shardIt, () -> {});
9488
} finally {
9589
executeNext(pendingExecutions, thread);
9690
}
@@ -99,9 +93,6 @@ protected void innerOnStreamResponse(SearchPhaseResult result) {
9993
@Override
10094
protected void innerOnCompleteResponse(SearchPhaseResult result) {
10195
try {
102-
if (getLogger().isTraceEnabled()) {
103-
getLogger().trace("coordinator received final for shard {}", shard);
104-
}
10596
onShardResult(result, shardIt);
10697
} finally {
10798
executeNext(pendingExecutions, thread);
@@ -111,8 +102,6 @@ protected void innerOnCompleteResponse(SearchPhaseResult result) {
111102
@Override
112103
public void onFailure(Exception t) {
113104
try {
114-
// It only happens when onPhaseDone() is called and executePhaseOnShard() fails
115-
// hard with an exception.
116105
if (totalOps.get() == expectedTotalOps) {
117106
onPhaseFailure(phase, "The phase has failed", t);
118107
} else {
@@ -131,31 +120,10 @@ public void onFailure(Exception t) {
131120
protected void onStreamResult(SearchPhaseResult result, SearchShardIterator shardIt, Runnable next) {
132121
assert result.getShardIndex() != -1 : "shard index is not set";
133122
assert result.getSearchShardTarget() != null : "search shard target must not be null";
134-
if (getLogger().isTraceEnabled()) {
135-
getLogger().trace("got streaming result from {}", result != null ? result.getSearchShardTarget() : null);
136-
}
137123
this.setPhaseResourceUsages();
138-
if (result.queryResult() != null) {
139-
result.queryResult().setPartial(true);
140-
}
141124
results.consumeResult(result, next);
142125
}
143126

144-
@Override
145-
protected void onShardResult(SearchPhaseResult result, SearchShardIterator shardIt) {
146-
// Trace final shard responses to diagnose coordinator sequencing.
147-
if (logger.isTraceEnabled()) {
148-
logger.trace(
149-
"COORDINATOR: received final shard result from shard={}, target={}, totalOps={}, expectedOps={}",
150-
result.getShardIndex(),
151-
result.getSearchShardTarget(),
152-
totalOps.get(),
153-
expectedTotalOps
154-
);
155-
}
156-
super.onShardResult(result, shardIt);
157-
}
158-
159127
@Override
160128
void successfulShardExecution(SearchShardIterator shardsIt) {
161129
final int remainingOpsOnIterator;
@@ -167,7 +135,6 @@ void successfulShardExecution(SearchShardIterator shardsIt) {
167135
final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);
168136
if (xTotalOps == expectedTotalOps) {
169137
try {
170-
// All final shard results have been processed; partials are not reduced.
171138
onPhaseDone();
172139
} catch (final Exception ex) {
173140
onPhaseFailure(this, "The phase has failed", ex);
@@ -180,8 +147,4 @@ void successfulShardExecution(SearchShardIterator shardsIt) {
180147
}
181148
}
182149

183-
private void successfulStreamExecution() {
184-
// No-op.
185-
}
186-
187150
}

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

Lines changed: 29 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
import org.opensearch.common.settings.Setting;
6161
import org.opensearch.common.settings.Setting.Property;
6262
import org.opensearch.common.unit.TimeValue;
63-
import org.opensearch.common.util.FeatureFlags;
6463
import org.opensearch.common.util.concurrent.AtomicArray;
6564
import org.opensearch.common.util.concurrent.CountDown;
6665
import org.opensearch.core.action.ActionListener;
@@ -87,7 +86,6 @@
8786
import org.opensearch.search.pipeline.SearchPipelineService;
8887
import org.opensearch.search.profile.ProfileShardResult;
8988
import org.opensearch.search.profile.SearchProfileShardResults;
90-
import org.opensearch.search.query.StreamingSearchMode;
9189
import org.opensearch.search.slice.SliceBuilder;
9290
import org.opensearch.tasks.CancellableTask;
9391
import org.opensearch.tasks.Task;
@@ -103,6 +101,7 @@
103101
import org.opensearch.transport.RemoteClusterAware;
104102
import org.opensearch.transport.RemoteClusterService;
105103
import org.opensearch.transport.RemoteTransportException;
104+
import org.opensearch.transport.StreamTransportService;
106105
import org.opensearch.transport.Transport;
107106
import org.opensearch.transport.TransportService;
108107
import org.opensearch.transport.client.Client;
@@ -175,8 +174,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
175174
private final ThreadPool threadPool;
176175
final ClusterService clusterService;
177176
final SearchTransportService searchTransportService;
178-
@Nullable
179-
final StreamSearchTransportService streamSearchTransportService;
180177
private final RemoteClusterService remoteClusterService;
181178
final SearchPhaseController searchPhaseController;
182179
private final SearchService searchService;
@@ -198,10 +195,8 @@ public TransportSearchAction(
198195
ThreadPool threadPool,
199196
CircuitBreakerService circuitBreakerService,
200197
TransportService transportService,
201-
@Nullable org.opensearch.transport.StreamTransportService streamTransportService,
202198
SearchService searchService,
203199
SearchTransportService searchTransportService,
204-
@Nullable StreamSearchTransportService streamSearchTransportService,
205200
SearchPhaseController searchPhaseController,
206201
ClusterService clusterService,
207202
ActionFilters actionFilters,
@@ -220,13 +215,12 @@ public TransportSearchAction(
220215
this.circuitBreaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST);
221216
this.searchPhaseController = searchPhaseController;
222217
this.searchTransportService = searchTransportService;
223-
this.streamSearchTransportService = streamSearchTransportService;
224218
this.remoteClusterService = searchTransportService.getRemoteClusterService();
225-
// Register request handlers - always classic; add streaming if available
226-
if (streamTransportService != null && streamSearchTransportService != null) {
227-
StreamSearchTransportService.registerStreamRequestHandler(streamTransportService, searchService);
219+
if (transportService instanceof StreamTransportService) {
220+
StreamSearchTransportService.registerStreamRequestHandler((StreamTransportService) transportService, searchService);
221+
} else {
222+
SearchTransportService.registerRequestHandler(transportService, searchService);
228223
}
229-
SearchTransportService.registerRequestHandler(transportService, searchService);
230224
this.clusterService = clusterService;
231225
this.searchService = searchService;
232226
this.indexNameExpressionResolver = indexNameExpressionResolver;
@@ -1147,21 +1141,11 @@ private void executeSearch(
11471141
}
11481142
}
11491143
final DiscoveryNodes nodes = clusterState.nodes();
1150-
final boolean isStreamingCandidate = (searchRequest.getStreamingSearchMode() != null)
1151-
&& searchRequest.scroll() == null;
1152-
final boolean streamingEnabledSetting = clusterService.getClusterSettings().get(StreamSearchTransportService.STREAM_SEARCH_ENABLED);
1153-
final boolean streamingEnabledEffective = streamingEnabledSetting || FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT);
1154-
final boolean useStreamingTransportForConnection = isStreamingCandidate
1155-
&& streamSearchTransportService != null
1156-
&& streamingEnabledEffective;
1157-
final SearchTransportService connectionTransport = useStreamingTransportForConnection
1158-
? streamSearchTransportService
1159-
: searchTransportService;
11601144
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(
11611145
searchRequest.getLocalClusterAlias(),
11621146
nodes::get,
11631147
remoteConnections,
1164-
connectionTransport::getConnection
1148+
searchTransportService::getConnection
11651149
);
11661150
final Executor asyncSearchExecutor = asyncSearchExecutor(concreteLocalIndices, clusterState);
11671151
final boolean preFilterSearchShards = shouldPreFilterSearchShards(
@@ -1290,31 +1274,10 @@ AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction(
12901274
SearchResponse.Clusters clusters,
12911275
SearchRequestContext searchRequestContext
12921276
) {
1293-
// Determine if this request should use streaming transport
1294-
final boolean isStreamingCandidate = (searchRequest.getStreamingSearchMode() != null)
1295-
&& searchRequest.scroll() == null;
1296-
1297-
// Check if streaming transport is actually available and enabled (cluster setting OR feature flag)
1298-
final boolean streamingEnabledSetting = clusterService.getClusterSettings().get(StreamSearchTransportService.STREAM_SEARCH_ENABLED);
1299-
final boolean streamingEnabledEffective = streamingEnabledSetting || FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT);
1300-
final boolean canUseStreamingTransport = (streamSearchTransportService != null) && streamingEnabledEffective;
1301-
1302-
// Use streaming transport for streaming search requests
1303-
final boolean useStreamingTransport = isStreamingCandidate && canUseStreamingTransport;
1304-
13051277
if (preFilter) {
1306-
if (logger.isTraceEnabled()) {
1307-
logger.trace(
1308-
"prefilter using transport [{}] (streaming={}, enabled={}, canUse={})",
1309-
((isStreamingCandidate && canUseStreamingTransport) ? "stream" : "classic"),
1310-
isStreamingCandidate,
1311-
streamingEnabledSetting,
1312-
canUseStreamingTransport
1313-
);
1314-
}
13151278
return new CanMatchPreFilterSearchPhase(
13161279
logger,
1317-
(isStreamingCandidate && useStreamingTransport) ? streamSearchTransportService : searchTransportService,
1280+
searchTransportService,
13181281
connectionLookup,
13191282
aliasFilter,
13201283
concreteIndexBoosts,
@@ -1350,34 +1313,21 @@ AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction(
13501313
tracer
13511314
);
13521315
} else {
1353-
final boolean isStreamingRequest = (searchRequest.getStreamingSearchMode() != null);
1354-
1355-
final SearchProgressListener progressListener = task.getProgressListener();
1356-
13571316
final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults(
13581317
executor,
13591318
circuitBreaker,
1360-
progressListener,
1319+
task.getProgressListener(),
13611320
searchRequest,
13621321
shardIterators.size(),
13631322
exc -> cancelTask(task, exc),
13641323
task::isCancelled
13651324
);
1366-
if (logger.isTraceEnabled()) {
1367-
logger.trace(
1368-
"query phase using transport [{}] (streamingRequest={}, enabled={}, canUse={})",
1369-
((isStreamingRequest && useStreamingTransport) ? "stream" : "classic"),
1370-
isStreamingRequest,
1371-
streamingEnabledSetting,
1372-
canUseStreamingTransport
1373-
);
1374-
}
13751325
AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction;
13761326
switch (searchRequest.searchType()) {
13771327
case DFS_QUERY_THEN_FETCH:
13781328
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(
13791329
logger,
1380-
(isStreamingRequest && canUseStreamingTransport) ? streamSearchTransportService : searchTransportService,
1330+
searchTransportService,
13811331
connectionLookup,
13821332
aliasFilter,
13831333
concreteIndexBoosts,
@@ -1397,49 +1347,26 @@ AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction(
13971347
);
13981348
break;
13991349
case QUERY_THEN_FETCH:
1400-
if (isStreamingRequest && canUseStreamingTransport) {
1401-
searchAsyncAction = new StreamSearchQueryThenFetchAsyncAction(
1402-
logger,
1403-
streamSearchTransportService,
1404-
connectionLookup,
1405-
aliasFilter,
1406-
concreteIndexBoosts,
1407-
indexRoutings,
1408-
searchPhaseController,
1409-
executor,
1410-
queryResultConsumer,
1411-
searchRequest,
1412-
listener,
1413-
shardIterators,
1414-
timeProvider,
1415-
clusterState,
1416-
task,
1417-
clusters,
1418-
searchRequestContext,
1419-
tracer
1420-
);
1421-
} else {
1422-
searchAsyncAction = new SearchQueryThenFetchAsyncAction(
1423-
logger,
1424-
searchTransportService,
1425-
connectionLookup,
1426-
aliasFilter,
1427-
concreteIndexBoosts,
1428-
indexRoutings,
1429-
searchPhaseController,
1430-
executor,
1431-
queryResultConsumer,
1432-
searchRequest,
1433-
listener,
1434-
shardIterators,
1435-
timeProvider,
1436-
clusterState,
1437-
task,
1438-
clusters,
1439-
searchRequestContext,
1440-
tracer
1441-
);
1442-
}
1350+
searchAsyncAction = new SearchQueryThenFetchAsyncAction(
1351+
logger,
1352+
searchTransportService,
1353+
connectionLookup,
1354+
aliasFilter,
1355+
concreteIndexBoosts,
1356+
indexRoutings,
1357+
searchPhaseController,
1358+
executor,
1359+
queryResultConsumer,
1360+
searchRequest,
1361+
listener,
1362+
shardIterators,
1363+
timeProvider,
1364+
clusterState,
1365+
task,
1366+
clusters,
1367+
searchRequestContext,
1368+
tracer
1369+
);
14431370
break;
14441371
default:
14451372
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");

server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -151,17 +151,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
151151
parser -> parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize)
152152
);
153153

154-
// Honor streaming when cluster setting is enabled OR feature flag is enabled
155-
final boolean streamingEnabledEffective = (clusterSettings != null && clusterSettings.get(STREAM_SEARCH_ENABLED))
156-
|| FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT);
157-
if (streamingEnabledEffective) {
154+
if (clusterSettings != null && clusterSettings.get(STREAM_SEARCH_ENABLED)) {
158155
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) {
159156
if (canUseStreamSearch(searchRequest)) {
160157
String scoringMode = request.param("stream_scoring_mode");
161158
if (scoringMode != null) {
162159
searchRequest.setStreamingSearchMode(scoringMode);
163160
}
164-
// StreamSearchAction should always execute with an explicit mode.
165161
if (searchRequest.getStreamingSearchMode() == null) {
166162
searchRequest.setStreamingSearchMode("no_scoring");
167163
}
@@ -172,7 +168,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
172168
} else {
173169
logger.debug("Stream search requested but search contains unsupported aggregations. Falling back to normal search.");
174170
}
175-
} else {
171+
} else if (searchRequest.getStreamingSearchMode() != null || request.hasParam("stream_scoring_mode")) {
176172
throw new IllegalArgumentException("You need to enable stream transport first to use stream search.");
177173
}
178174
}
@@ -182,14 +178,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
182178
};
183179
}
184180

185-
/**
186-
* Factory method for creating RestCancellableNodeClient instances.
187-
* This method is protected to allow tests to override it and inject spyable clients.
188-
*
189-
* @param client the NodeClient to wrap
190-
* @param httpChannel the HTTP channel for cancellation tracking
191-
* @return a RestCancellableNodeClient instance
192-
*/
193181
protected RestCancellableNodeClient createRestCancellableNodeClient(NodeClient client, HttpChannel httpChannel) {
194182
return new RestCancellableNodeClient(client, httpChannel);
195183
}

0 commit comments

Comments
 (0)