Skip to content

Commit 72f8845

Browse files
fixes
1 parent c659106 commit 72f8845

File tree

2 files changed

+25
-23
lines changed

2 files changed

+25
-23
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
2727
import org.elasticsearch.action.bulk.TransportBulkAction;
2828
import org.elasticsearch.action.index.TransportIndexAction;
29+
import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction;
2930
import org.elasticsearch.action.search.SearchTransportService;
3031
import org.elasticsearch.action.search.TransportSearchAction;
3132
import org.elasticsearch.action.support.WriteRequest;
@@ -393,6 +394,7 @@ public void testSearchTaskDescriptions() {
393394
taskInfo.description(),
394395
Regex.simpleMatch("id[*], size[1], lastEmittedDoc[null]", taskInfo.description())
395396
);
397+
case SearchQueryThenFetchAsyncAction.NODE_SEARCH_ACTION_NAME -> assertThat(taskInfo.description(), notNullValue());
396398
default -> fail("Unexpected action [" + taskInfo.action() + "] with description [" + taskInfo.description() + "]");
397399
}
398400
// assert that all task descriptions have non-zero length

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.logging.log4j.Logger;
1414
import org.apache.lucene.search.ScoreDoc;
1515
import org.apache.lucene.search.TopFieldDocs;
16+
import org.elasticsearch.ExceptionsHelper;
1617
import org.elasticsearch.TransportVersions;
1718
import org.elasticsearch.action.ActionListener;
1819
import org.elasticsearch.action.IndicesRequest;
@@ -76,7 +77,7 @@
7677

7778
import static org.elasticsearch.action.search.SearchPhaseController.getTopDocsSize;
7879

79-
class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPhaseResult> {
80+
public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPhaseResult> {
8081

8182
private static final Logger logger = LogManager.getLogger(SearchQueryThenFetchAsyncAction.class);
8283

@@ -445,15 +446,19 @@ protected void run() {
445446
)
446447
);
447448
} else {
448-
performPhaseOnShard(shardIndex, shardRoutings, target);
449+
performPhaseOnShard(shardIndex, shardRoutings, routing);
449450
}
450451
}
451452
}
452453
perNodeQueries.forEach((routing, request) -> {
453454
if (request.shards.size() == 1) {
454455
var shard = request.shards.getFirst();
455456
final int sidx = shard.shardIndex;
456-
this.performPhaseOnShard(sidx, shardIterators[sidx], routing);
457+
this.performPhaseOnShard(
458+
sidx,
459+
shardIterators[sidx],
460+
new SearchShardTarget(routing.nodeId(), shard.shardId, routing.clusterAlias())
461+
);
457462
return;
458463
}
459464
final Transport.Connection connection;
@@ -466,7 +471,11 @@ protected void run() {
466471
if (connection.getTransportVersion().before(TransportVersions.BATCHED_QUERY_PHASE_VERSION)) {
467472
for (ShardToQuery shard : request.shards) {
468473
final int sidx = shard.shardIndex;
469-
this.performPhaseOnShard(sidx, shardIterators[sidx], routing);
474+
this.performPhaseOnShard(
475+
sidx,
476+
shardIterators[sidx],
477+
new SearchShardTarget(routing.nodeId(), shard.shardId, routing.clusterAlias())
478+
);
470479
}
471480
return;
472481
}
@@ -507,7 +516,11 @@ public void handleResponse(NodeQueryResponse response) {
507516

508517
@Override
509518
public void handleException(TransportException e) {
510-
onNodeQueryFailure(e, request, routing);
519+
Exception cause = (Exception) ExceptionsHelper.unwrapCause(e);
520+
if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) {
521+
queryPhaseResultConsumer.failure.compareAndSet(null, cause);
522+
}
523+
onPhaseFailure(getName(), "", cause);
511524
}
512525
});
513526
});
@@ -520,17 +533,12 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP
520533
}
521534
}
522535

523-
protected void performPhaseOnShard(
524-
final int shardIndex,
525-
final SearchShardIterator shardIt,
526-
final CanMatchPreFilterSearchPhase.SendingTarget target
527-
) {
528-
final var searchShardTarget = new SearchShardTarget(target.nodeId(), shardIt.shardId(), target.clusterAlias());
536+
protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
529537
final Transport.Connection connection;
530538
try {
531-
connection = getConnection(target.clusterAlias(), target.nodeId());
539+
connection = getConnection(shard.getClusterAlias(), shard.getNodeId());
532540
} catch (Exception e) {
533-
onShardFailure(shardIndex, searchShardTarget, shardIt, e);
541+
onShardFailure(shardIndex, shard, shardIt, e);
534542
return;
535543
}
536544
final String indexUUID = shardIt.shardId().getIndex().getUUID();
@@ -555,7 +563,7 @@ protected void performPhaseOnShard(
555563
)
556564
),
557565
task,
558-
new SearchActionListener<>(searchShardTarget, shardIndex) {
566+
new SearchActionListener<>(shard, shardIndex) {
559567
@Override
560568
public void innerOnResponse(SearchPhaseResult result) {
561569
try {
@@ -568,20 +576,12 @@ public void innerOnResponse(SearchPhaseResult result) {
568576

569577
@Override
570578
public void onFailure(Exception e) {
571-
onShardFailure(shardIndex, searchShardTarget, shardIt, e);
579+
onShardFailure(shardIndex, shard, shardIt, e);
572580
}
573581
}
574582
);
575583
}
576584

577-
@Override
578-
public void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection) {
579-
assert isPartOfPointInTime(contextId) == false : "Must not release point in time context [" + contextId + "]";
580-
if (connection != null) {
581-
searchTransportService.sendFreeContext(connection, contextId, ActionListener.noop());
582-
}
583-
}
584-
585585
public static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]";
586586

587587
private static final CircuitBreaker NOOP_CIRCUIT_BREAKER = new NoopCircuitBreaker("request");

0 commit comments

Comments
 (0)