Skip to content

Commit 427b594

Browse files
Cleanup listener wrapping in TransportSearchAction a little (#116184)
Lets avoid parsing settings during search request processing, remove unused field from the listener and using the delegating listener class.
1 parent aa99392 commit 427b594

File tree

1 file changed

+18
-23
lines changed

1 file changed

+18
-23
lines changed

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.ActionListener;
1717
import org.elasticsearch.action.ActionListenerResponseHandler;
1818
import org.elasticsearch.action.ActionType;
19+
import org.elasticsearch.action.DelegatingActionListener;
1920
import org.elasticsearch.action.IndicesRequest;
2021
import org.elasticsearch.action.OriginalIndices;
2122
import org.elasticsearch.action.RemoteClusterActionType;
@@ -52,7 +53,6 @@
5253
import org.elasticsearch.common.regex.Regex;
5354
import org.elasticsearch.common.settings.Setting;
5455
import org.elasticsearch.common.settings.Setting.Property;
55-
import org.elasticsearch.common.settings.Settings;
5656
import org.elasticsearch.common.util.ArrayUtils;
5757
import org.elasticsearch.common.util.CollectionUtils;
5858
import org.elasticsearch.common.util.Maps;
@@ -160,7 +160,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
160160
private final SearchResponseMetrics searchResponseMetrics;
161161
private final Client client;
162162
private final UsageService usageService;
163-
private final Settings settings;
163+
private final boolean collectTelemetry;
164164

165165
@Inject
166166
public TransportSearchAction(
@@ -193,9 +193,10 @@ public TransportSearchAction(
193193
this.indexNameExpressionResolver = indexNameExpressionResolver;
194194
this.namedWriteableRegistry = namedWriteableRegistry;
195195
this.executorSelector = executorSelector;
196-
this.settings = clusterService.getSettings();
196+
var settings = clusterService.getSettings();
197197
this.defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE.get(settings);
198198
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(settings);
199+
this.collectTelemetry = SearchService.CCS_COLLECT_TELEMETRY.get(settings);
199200
this.searchResponseMetrics = searchResponseMetrics;
200201
this.client = client;
201202
this.usageService = usageService;
@@ -313,12 +314,7 @@ public long buildTookInMillis() {
313314

314315
@Override
315316
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
316-
executeRequest(
317-
(SearchTask) task,
318-
searchRequest,
319-
new SearchResponseActionListener((SearchTask) task, listener),
320-
AsyncSearchActionProvider::new
321-
);
317+
executeRequest((SearchTask) task, searchRequest, new SearchResponseActionListener(listener), AsyncSearchActionProvider::new);
322318
}
323319

324320
void executeRequest(
@@ -372,7 +368,7 @@ void executeRequest(
372368
searchPhaseProvider.apply(delegate)
373369
);
374370
} else {
375-
if (listener instanceof TelemetryListener tl) {
371+
if (delegate instanceof TelemetryListener tl) {
376372
tl.setRemotes(resolvedIndices.getRemoteClusterIndices().size());
377373
if (task.isAsync()) {
378374
tl.setFeature(CCSUsageTelemetry.ASYNC_FEATURE);
@@ -398,7 +394,7 @@ void executeRequest(
398394
}
399395
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId();
400396
if (shouldMinimizeRoundtrips(rewritten)) {
401-
if (listener instanceof TelemetryListener tl) {
397+
if (delegate instanceof TelemetryListener tl) {
402398
tl.setFeature(CCSUsageTelemetry.MRT_FEATURE);
403399
}
404400
final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null
@@ -508,21 +504,21 @@ void executeRequest(
508504
// We set the keep alive to -1 to indicate that we don't need the pit id in the response.
509505
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
510506
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE));
511-
var pitListener = new SearchResponseActionListener(task, listener) {
507+
var pitListener = new SearchResponseActionListener(delegate) {
512508
@Override
513509
public void onResponse(SearchResponse response) {
514510
// we need to close the PIT first so we delay the release of the response to after the closing
515511
response.incRef();
516512
closePIT(
517513
client,
518514
original.source().pointInTimeBuilder(),
519-
() -> ActionListener.respondAndRelease(listener, response)
515+
() -> ActionListener.respondAndRelease(delegate, response)
520516
);
521517
}
522518

523519
@Override
524520
public void onFailure(Exception e) {
525-
closePIT(client, original.source().pointInTimeBuilder(), () -> listener.onFailure(e));
521+
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e));
526522
}
527523
};
528524
executeRequest(task, original, pitListener, searchPhaseProvider);
@@ -1874,14 +1870,13 @@ private interface TelemetryListener {
18741870
void setClient(String client);
18751871
}
18761872

1877-
private class SearchResponseActionListener implements ActionListener<SearchResponse>, TelemetryListener {
1878-
private final SearchTask task;
1879-
private final ActionListener<SearchResponse> listener;
1873+
private class SearchResponseActionListener extends DelegatingActionListener<SearchResponse, SearchResponse>
1874+
implements
1875+
TelemetryListener {
18801876
private final CCSUsage.Builder usageBuilder;
18811877

1882-
SearchResponseActionListener(SearchTask task, ActionListener<SearchResponse> listener) {
1883-
this.task = task;
1884-
this.listener = listener;
1878+
SearchResponseActionListener(ActionListener<SearchResponse> listener) {
1879+
super(listener);
18851880
if (listener instanceof SearchResponseActionListener srListener) {
18861881
usageBuilder = srListener.usageBuilder;
18871882
} else {
@@ -1893,7 +1888,7 @@ private class SearchResponseActionListener implements ActionListener<SearchRespo
18931888
* Should we collect telemetry for this search?
18941889
*/
18951890
private boolean collectTelemetry() {
1896-
return SearchService.CCS_COLLECT_TELEMETRY.get(settings) && usageBuilder.getRemotesCount() > 0;
1891+
return collectTelemetry && usageBuilder.getRemotesCount() > 0;
18971892
}
18981893

18991894
public void setRemotes(int count) {
@@ -1942,7 +1937,7 @@ public void onResponse(SearchResponse searchResponse) {
19421937
return;
19431938
}
19441939
// This is last because we want to collect telemetry before returning the response.
1945-
listener.onResponse(searchResponse);
1940+
delegate.onResponse(searchResponse);
19461941
}
19471942

19481943
@Override
@@ -1952,7 +1947,7 @@ public void onFailure(Exception e) {
19521947
usageBuilder.setFailure(e);
19531948
recordTelemetry();
19541949
}
1955-
listener.onFailure(e);
1950+
super.onFailure(e);
19561951
}
19571952

19581953
private void recordTelemetry() {

0 commit comments

Comments
 (0)