Skip to content

Commit cd63e4b

Browse files
Atri SharmaAtri Sharma
authored andcommitted
Descoping to single level
Signed-off-by: Atri Sharma <atrisharma@Atris-Mac-Studio.local>
1 parent 62768a0 commit cd63e4b

File tree

12 files changed

+107
-347
lines changed

12 files changed

+107
-347
lines changed

server/src/main/java/org/opensearch/action/search/StreamSearchQueryThenFetchAsyncAction.java

Lines changed: 0 additions & 150 deletions
This file was deleted.

server/src/main/java/org/opensearch/action/search/StreamSearchTransportService.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,11 @@ public StreamSearchTransportService(
5959

6060
@Override
6161
public Transport.Connection getConnection(@Nullable String clusterAlias, DiscoveryNode node) {
62-
// Delegate to StreamTransportService to get connections from the streaming connection manager.
63-
// This ensures connections understand the streaming protocol and call handleStreamResponse()
64-
// instead of handleResponse() on StreamTransportResponseHandler instances.
65-
return transportService.getConnection(node);
62+
if (clusterAlias == null) {
63+
return transportService.getConnection(node);
64+
} else {
65+
return transportService.getRemoteClusterService().getConnection(node, clusterAlias);
66+
}
6667
}
6768

6869
public static final Setting<Boolean> STREAM_SEARCH_ENABLED = Setting.boolSetting(
@@ -93,7 +94,7 @@ public static void registerStreamRequestHandler(StreamTransportService transport
9394
request,
9495
false,
9596
(SearchShardTask) task,
96-
new StreamSearchChannelListener<>(channel, QUERY_ACTION_NAME, request),
97+
new StreamSearchChannelListener<>(channel, QUERY_ACTION_NAME),
9798
ThreadPool.Names.STREAM_SEARCH,
9899
isStreamSearch
99100
);
@@ -110,7 +111,7 @@ public static void registerStreamRequestHandler(StreamTransportService transport
110111
searchService.executeFetchPhase(
111112
request,
112113
(SearchShardTask) task,
113-
new StreamSearchChannelListener<>(channel, FETCH_ID_ACTION_NAME, request),
114+
new StreamSearchChannelListener<>(channel, FETCH_ID_ACTION_NAME),
114115
ThreadPool.Names.STREAM_SEARCH
115116
);
116117
}
@@ -120,7 +121,7 @@ public static void registerStreamRequestHandler(StreamTransportService transport
120121
ThreadPool.Names.SAME,
121122
ShardSearchRequest::new,
122123
(request, channel, task) -> {
123-
searchService.canMatch(request, new StreamSearchChannelListener<>(channel, QUERY_CAN_MATCH_NAME, request));
124+
searchService.canMatch(request, new StreamSearchChannelListener<>(channel, QUERY_CAN_MATCH_NAME));
124125
}
125126
);
126127
transportService.registerRequestHandler(
@@ -145,7 +146,7 @@ public static void registerStreamRequestHandler(StreamTransportService transport
145146
request,
146147
false,
147148
(SearchShardTask) task,
148-
new StreamSearchChannelListener<>(channel, DFS_ACTION_NAME, request),
149+
new StreamSearchChannelListener<>(channel, DFS_ACTION_NAME),
149150
ThreadPool.Names.STREAM_SEARCH
150151
)
151152
);

server/src/main/java/org/opensearch/action/support/StreamSearchChannelListener.java

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,74 +14,52 @@
1414
import org.opensearch.core.action.ActionListener;
1515
import org.opensearch.core.transport.TransportResponse;
1616
import org.opensearch.transport.TransportChannel;
17-
import org.opensearch.transport.TransportRequest;
1817

1918
import java.io.IOException;
2019

21-
/**
22-
* A listener that sends the response back to the channel in streaming fashion.
23-
*
24-
* - onStreamResponse(): Send streaming responses
25-
* - onResponse(): Standard ActionListener method that send last stream response
26-
* - onFailure(): Handle errors and complete the stream
27-
*/
20+
/** Streams transport responses through a {@link TransportChannel}. */
2821
@ExperimentalApi
29-
public class StreamSearchChannelListener<Response extends TransportResponse, Request extends TransportRequest>
22+
public class StreamSearchChannelListener<Response extends TransportResponse>
3023
implements
3124
ActionListener<Response> {
3225

3326
private static final Logger logger = LogManager.getLogger(StreamSearchChannelListener.class);
3427
private final TransportChannel channel;
35-
private final Request request;
3628
private final String actionName;
3729

3830
private final java.util.concurrent.atomic.AtomicBoolean completed = new java.util.concurrent.atomic.AtomicBoolean(false);
3931

40-
public StreamSearchChannelListener(TransportChannel channel, String actionName, Request request) {
32+
public StreamSearchChannelListener(TransportChannel channel, String actionName) {
4133
this.channel = channel;
42-
this.request = request;
4334
this.actionName = actionName;
4435
}
4536

46-
/**
47-
* Send streaming responses
48-
* This allows multiple responses to be sent for a single request.
49-
*
50-
* @param response the intermediate response to send
51-
* @param isLastBatch whether this response is the last one
52-
*/
37+
/** Sends a streamed response batch and optionally completes the stream. */
5338
public void onStreamResponse(Response response, boolean isLastBatch) {
5439
assert response != null;
5540
if (completed.get()) {
56-
// Ignore late responses after completion to avoid double-completion and task tracker mismatches
5741
return;
5842
}
59-
channel.sendResponseBatch(response);
60-
if (isLastBatch) {
61-
try {
43+
try {
44+
channel.sendResponseBatch(response);
45+
if (isLastBatch) {
6246
channel.completeStream();
63-
} finally {
6447
completed.set(true);
6548
}
49+
} catch (Exception e) {
50+
logger.warn("Failed to send streaming response on channel for action [{}]", actionName, e);
51+
throw e;
6652
}
6753
}
6854

69-
/**
70-
* Reuse ActionListener method to send the last stream response
71-
* This maintains compatibility on data node side
72-
*
73-
* @param response the response to send
74-
*/
7555
@Override
7656
public final void onResponse(Response response) {
7757
onStreamResponse(response, true);
7858
}
7959

8060
@Override
8161
public void onFailure(Exception e) {
82-
// Ensure we only fail once per request/channel to keep task tracker consistent
8362
if (completed.getAndSet(true)) {
84-
// Already completed (success or failure); drop duplicate failure
8563
return;
8664
}
8765
try {

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -887,8 +887,7 @@ public void apply(Settings value, Settings current, Settings previous) {
887887
ForceMergeManagerSettings.DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE,
888888
ForceMergeManagerSettings.JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE,
889889
ForceMergeManagerSettings.CONCURRENCY_MULTIPLIER,
890-
StreamTransportService.STREAM_TRANSPORT_REQ_TIMEOUT_SETTING,
891-
StreamSearchTransportService.STREAM_SEARCH_ENABLED
890+
StreamTransportService.STREAM_TRANSPORT_REQ_TIMEOUT_SETTING
892891
)
893892
)
894893
);
@@ -909,6 +908,10 @@ public void apply(Settings value, Settings current, Settings previous) {
909908
TelemetrySettings.METRICS_PUBLISH_INTERVAL_SETTING,
910909
TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING,
911910
TelemetrySettings.METRICS_FEATURE_ENABLED_SETTING
911+
),
912+
List.of(FeatureFlags.STREAM_TRANSPORT),
913+
List.of(
914+
StreamSearchTransportService.STREAM_SEARCH_ENABLED
912915
)
913916
);
914917
}

server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -151,26 +151,28 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
151151
parser -> parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize)
152152
);
153153

154-
if (clusterSettings != null && clusterSettings.get(STREAM_SEARCH_ENABLED)) {
155-
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) {
156-
if (canUseStreamSearch(searchRequest)) {
157-
String scoringMode = request.param("stream_scoring_mode");
158-
if (scoringMode != null) {
159-
searchRequest.setStreamingSearchMode(scoringMode);
160-
}
161-
if (searchRequest.getStreamingSearchMode() == null) {
162-
searchRequest.setStreamingSearchMode("no_scoring");
163-
}
164-
return channel -> {
165-
RestCancellableNodeClient cancelClient = createRestCancellableNodeClient(client, request.getHttpChannel());
166-
cancelClient.execute(StreamSearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));
167-
};
168-
} else {
169-
logger.debug("Stream search requested but search contains unsupported aggregations. Falling back to normal search.");
170-
}
171-
} else if (searchRequest.getStreamingSearchMode() != null || request.hasParam("stream_scoring_mode")) {
154+
if (searchRequest.getStreamingSearchMode() != null || request.hasParam("stream_scoring_mode")) {
155+
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT) == false) {
172156
throw new IllegalArgumentException("You need to enable stream transport first to use stream search.");
173157
}
158+
if (clusterSettings == null || clusterSettings.get(STREAM_SEARCH_ENABLED) == false) {
159+
throw new IllegalArgumentException("Stream search is disabled. Enable [stream.search.enabled] to use stream search.");
160+
}
161+
if (canUseStreamSearch(searchRequest)) {
162+
String scoringMode = request.param("stream_scoring_mode");
163+
if (scoringMode != null) {
164+
searchRequest.setStreamingSearchMode(scoringMode);
165+
}
166+
if (searchRequest.getStreamingSearchMode() == null) {
167+
searchRequest.setStreamingSearchMode("no_scoring");
168+
}
169+
return channel -> {
170+
RestCancellableNodeClient cancelClient = createRestCancellableNodeClient(client, request.getHttpChannel());
171+
cancelClient.execute(StreamSearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));
172+
};
173+
} else {
174+
logger.debug("Stream search requested but search contains unsupported aggregations. Falling back to normal search.");
175+
}
174176
}
175177
return channel -> {
176178
RestCancellableNodeClient cancelClient = createRestCancellableNodeClient(client, request.getHttpChannel());

0 commit comments

Comments
 (0)