Skip to content

Commit 42d1e52

Browse files
Fix/cleanup two spots in open-PIT request handling (#116553) (#117430)
Two things fixed here: 1. Don't fork just to send the response, it's unnecessary. Serializing the ID might take a little time but if it's really an issue we should optimize it rather than forking just to send a single response. 2. Handle finding a connection cleanly, don't allow the exception to bubble up and fail the phase, this may cause leaks.
1 parent db3ccdd commit 42d1e52

File tree

1 file changed

+9
-29
lines changed

1 file changed

+9
-29
lines changed

server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2929
import org.elasticsearch.common.io.stream.StreamInput;
3030
import org.elasticsearch.common.io.stream.StreamOutput;
31-
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3231
import org.elasticsearch.common.util.concurrent.EsExecutors;
3332
import org.elasticsearch.core.TimeValue;
3433
import org.elasticsearch.index.shard.ShardId;
@@ -257,16 +256,17 @@ protected void executePhaseOnShard(
257256
SearchShardTarget shard,
258257
SearchActionListener<SearchPhaseResult> phaseListener
259258
) {
260-
final ShardOpenReaderRequest shardRequest = new ShardOpenReaderRequest(
261-
shardIt.shardId(),
262-
shardIt.getOriginalIndices(),
263-
pitRequest.keepAlive()
264-
);
265-
Transport.Connection connection = connectionLookup.apply(shardIt.getClusterAlias(), shard.getNodeId());
259+
final Transport.Connection connection;
260+
try {
261+
connection = connectionLookup.apply(shardIt.getClusterAlias(), shard.getNodeId());
262+
} catch (Exception e) {
263+
phaseListener.onFailure(e);
264+
return;
265+
}
266266
transportService.sendChildRequest(
267267
connection,
268268
OPEN_SHARD_READER_CONTEXT_NAME,
269-
shardRequest,
269+
new ShardOpenReaderRequest(shardIt.shardId(), shardIt.getOriginalIndices(), pitRequest.keepAlive()),
270270
task,
271271
new ActionListenerResponseHandler<>(
272272
phaseListener,
@@ -279,29 +279,9 @@ protected void executePhaseOnShard(
279279
@Override
280280
protected SearchPhase getNextPhase() {
281281
return new SearchPhase(getName()) {
282-
283-
private void onExecuteFailure(Exception e) {
284-
onPhaseFailure(this, "sending response failed", e);
285-
}
286-
287282
@Override
288283
public void run() {
289-
execute(new AbstractRunnable() {
290-
@Override
291-
public void onFailure(Exception e) {
292-
onExecuteFailure(e);
293-
}
294-
295-
@Override
296-
protected void doRun() {
297-
sendSearchResponse(SearchResponseSections.EMPTY_WITH_TOTAL_HITS, results.getAtomicArray());
298-
}
299-
300-
@Override
301-
public boolean isForceExecution() {
302-
return true; // we already created the PIT, no sense in rejecting the task that sends the response.
303-
}
304-
});
284+
sendSearchResponse(SearchResponseSections.EMPTY_WITH_TOTAL_HITS, results.getAtomicArray());
305285
}
306286
};
307287
}

0 commit comments

Comments
 (0)