1616import org .elasticsearch .TransportVersions ;
1717import org .elasticsearch .action .ActionListener ;
1818import org .elasticsearch .action .IndicesRequest ;
19- import org .elasticsearch .action .NoShardAvailableActionException ;
2019import org .elasticsearch .action .OriginalIndices ;
2120import org .elasticsearch .action .support .ChannelActionListener ;
2221import org .elasticsearch .action .support .IndicesOptions ;
5352import org .elasticsearch .threadpool .ThreadPool ;
5453import org .elasticsearch .transport .LeakTracker ;
5554import org .elasticsearch .transport .Transport ;
55+ import org .elasticsearch .transport .TransportActionProxy ;
5656import org .elasticsearch .transport .TransportChannel ;
5757import org .elasticsearch .transport .TransportException ;
5858import org .elasticsearch .transport .TransportRequest ;
@@ -285,19 +285,22 @@ public static class NodeQueryRequest extends TransportRequest implements Indices
285285 private final Map <String , AliasFilter > aliasFilters ;
286286 private final int totalShards ;
287287 private final long absoluteStartMillis ;
288+ private final String localClusterAlias ;
288289
289290 private NodeQueryRequest (
290291 List <ShardToQuery > shards ,
291292 SearchRequest searchRequest ,
292293 Map <String , AliasFilter > aliasFilters ,
293294 int totalShards ,
294- long absoluteStartMillis
295+ long absoluteStartMillis ,
296+ String localClusterAlias
295297 ) {
296298 this .shards = shards ;
297299 this .searchRequest = searchRequest ;
298300 this .aliasFilters = aliasFilters ;
299301 this .totalShards = totalShards ;
300302 this .absoluteStartMillis = absoluteStartMillis ;
303+ this .localClusterAlias = localClusterAlias ;
301304 }
302305
303306 private NodeQueryRequest (StreamInput in ) throws IOException {
@@ -307,6 +310,7 @@ private NodeQueryRequest(StreamInput in) throws IOException {
307310 this .aliasFilters = in .readImmutableMap (AliasFilter ::readFrom );
308311 this .totalShards = in .readVInt ();
309312 this .absoluteStartMillis = in .readLong ();
313+ this .localClusterAlias = in .readOptionalString ();
310314 }
311315
312316 @ Override
@@ -322,6 +326,7 @@ public void writeTo(StreamOutput out) throws IOException {
322326 out .writeMap (aliasFilters , (o , v ) -> v .writeTo (o ));
323327 out .writeVInt (totalShards );
324328 out .writeLong (absoluteStartMillis );
329+ out .writeOptionalString (localClusterAlias );
325330 }
326331
327332 @ Override
@@ -395,16 +400,18 @@ private static boolean isPartOfPIT(SearchRequest request, ShardSearchContextId c
395400
396401 @ Override
397402 protected void run () {
403+ for (final SearchShardIterator iterator : toSkipShardsIts ) {
404+ assert iterator .skip ();
405+ skipShard (iterator );
406+ }
398407 // TODO: stupid but we kinda need to fill all of these in with the current logic, do something nicer before merging
399408 final Map <SearchShardIterator , Integer > shardIndexMap = Maps .newHashMapWithExpectedSize (shardIterators .length );
400409 for (int i = 0 ; i < shardIterators .length ; i ++) {
401410 shardIndexMap .put (shardIterators [i ], i );
402411 }
403- final boolean supportsBatchedQuery = minTransportVersion .onOrAfter (TransportVersions .BATCHED_QUERY_PHASE_VERSION );
404- final Map <String , NodeQueryRequest > perNodeQueries = new HashMap <>();
412+ final Map <CanMatchPreFilterSearchPhase .SendingTarget , NodeQueryRequest > perNodeQueries = new HashMap <>();
405413 AbstractSearchAsyncAction .doCheckNoMissingShards (getName (), request , shardsIts , AbstractSearchAsyncAction ::makeMissingShardsError );
406414 final String localNodeId = searchTransportService .transportService ().getLocalNode ().getId ();
407- final String localClusterAlias = request .getLocalClusterAlias ();
408415 for (int i = 0 ; i < shardsIts .size (); i ++) {
409416 final SearchShardIterator shardRoutings = shardsIts .get (i );
410417 assert shardRoutings .skip () == false ;
@@ -414,19 +421,19 @@ protected void run() {
414421 if (routing == null ) {
415422 failOnUnavailable (shardIndex , shardRoutings );
416423 } else {
417- String clusterAlias = routing .getClusterAlias ();
418424 final String nodeId = routing .getNodeId ();
419- if ( supportsBatchedQuery
420- && localNodeId . equals ( nodeId ) == false // local requests don't need batching as there's no network latency
421- && ( clusterAlias == null || Objects .equals (localClusterAlias , clusterAlias )) ) {
425+ // local requests don't need batching as there's no network latency
426+ final var target = new CanMatchPreFilterSearchPhase . SendingTarget ( routing . getClusterAlias (), routing . getNodeId ());
427+ if ( localNodeId .equals (nodeId ) == false ) {
422428 perNodeQueries .computeIfAbsent (
423- nodeId ,
429+ target ,
424430 ignored -> new NodeQueryRequest (
425431 new ArrayList <>(),
426432 request ,
427433 aliasFilter ,
428434 shardsIts .size (),
429- timeProvider .absoluteStartMillis ()
435+ timeProvider .absoluteStartMillis (),
436+ routing .getClusterAlias ()
430437 )
431438 ).shards .add (
432439 new ShardToQuery (
@@ -438,22 +445,29 @@ protected void run() {
438445 )
439446 );
440447 } else {
441- performPhaseOnShard (shardIndex , shardRoutings , routing );
448+ performPhaseOnShard (shardIndex , shardRoutings , target );
442449 }
443450 }
444451 }
445- perNodeQueries .forEach ((nodeId , request ) -> {
452+ perNodeQueries .forEach ((routing , request ) -> {
446453 if (request .shards .size () == 1 ) {
447454 var shard = request .shards .getFirst ();
448455 final int sidx = shard .shardIndex ;
449- this .performPhaseOnShard (sidx , shardIterators [sidx ], new SearchShardTarget ( nodeId , shard . shardId , localClusterAlias ) );
456+ this .performPhaseOnShard (sidx , shardIterators [sidx ], routing );
450457 return ;
451458 }
452459 final Transport .Connection connection ;
453460 try {
454- connection = getConnection (localClusterAlias , nodeId );
461+ connection = getConnection (routing . clusterAlias (), routing . nodeId () );
455462 } catch (Exception e ) {
456- onNodeQueryFailure (e , request , nodeId );
463+ onNodeQueryFailure (e , request , routing );
464+ return ;
465+ }
466+ if (connection .getTransportVersion ().before (TransportVersions .BATCHED_QUERY_PHASE_VERSION )) {
467+ for (ShardToQuery shard : request .shards ) {
468+ final int sidx = shard .shardIndex ;
469+ this .performPhaseOnShard (sidx , shardIterators [sidx ], routing );
470+ }
457471 return ;
458472 }
459473 searchTransportService .transportService ()
@@ -476,7 +490,7 @@ public void handleResponse(NodeQueryResponse response) {
476490 for (int i = 0 ; i < response .results .length ; i ++) {
477491 var s = request .shards .get (i );
478492 int shardIdx = s .shardIndex ;
479- final SearchShardTarget target = new SearchShardTarget (nodeId , s .shardId , localClusterAlias );
493+ final SearchShardTarget target = new SearchShardTarget (routing . nodeId () , s .shardId , routing . clusterAlias () );
480494 switch (response .results [i ]) {
481495 case Exception e -> onShardFailure (shardIdx , target , shardIterators [shardIdx ], e );
482496 case SearchPhaseResult q -> {
@@ -493,30 +507,30 @@ public void handleResponse(NodeQueryResponse response) {
493507
494508 @ Override
495509 public void handleException (TransportException e ) {
496- onNodeQueryFailure (e , request , nodeId );
510+ onNodeQueryFailure (e , request , routing );
497511 }
498512 });
499513 });
500514 }
501515
502- private void onNodeQueryFailure (Exception e , NodeQueryRequest request , String nodeId ) {
516+ private void onNodeQueryFailure (Exception e , NodeQueryRequest request , CanMatchPreFilterSearchPhase . SendingTarget target ) {
503517 for (ShardToQuery shard : request .shards ) {
504518 int idx = shard .shardIndex ;
505- onShardFailure (
506- idx ,
507- new SearchShardTarget (nodeId , shard .shardId , request .searchRequest .getLocalClusterAlias ()),
508- shardIterators [idx ],
509- e
510- );
519+ onShardFailure (idx , new SearchShardTarget (target .nodeId (), shard .shardId , target .clusterAlias ()), shardIterators [idx ], e );
511520 }
512521 }
513522
514- protected void performPhaseOnShard (final int shardIndex , final SearchShardIterator shardIt , final SearchShardTarget shard ) {
523+ protected void performPhaseOnShard (
524+ final int shardIndex ,
525+ final SearchShardIterator shardIt ,
526+ final CanMatchPreFilterSearchPhase .SendingTarget target
527+ ) {
528+ final var searchShardTarget = new SearchShardTarget (target .nodeId (), shardIt .shardId (), target .clusterAlias ());
515529 final Transport .Connection connection ;
516530 try {
517- connection = getConnection (shard . getClusterAlias (), shard . getNodeId ());
531+ connection = getConnection (target . clusterAlias (), target . nodeId ());
518532 } catch (Exception e ) {
519- onShardFailure (shardIndex , shard , shardIt , e );
533+ onShardFailure (shardIndex , searchShardTarget , shardIt , e );
520534 return ;
521535 }
522536 final String indexUUID = shardIt .shardId ().getIndex ().getUUID ();
@@ -541,21 +555,20 @@ protected void performPhaseOnShard(final int shardIndex, final SearchShardIterat
541555 )
542556 ),
543557 task ,
544- new SearchActionListener <>(shard , shardIndex ) {
558+ new SearchActionListener <>(searchShardTarget , shardIndex ) {
545559 @ Override
546560 public void innerOnResponse (SearchPhaseResult result ) {
547561 try {
548562 onShardResult (result );
549563 } catch (Exception exc ) {
550564 // TODO: this looks like a nasty bug where it to actually happen
551565 assert false : exc ;
552- onShardFailure (shardIndex , shard , shardIt , exc );
553566 }
554567 }
555568
556569 @ Override
557570 public void onFailure (Exception e ) {
558- onShardFailure (shardIndex , shard , shardIt , e );
571+ onShardFailure (shardIndex , searchShardTarget , shardIt , e );
559572 }
560573 }
561574 );
@@ -608,7 +621,7 @@ public static void registerNodeSearchAction(SearchTransportService searchTranspo
608621 }
609622 }
610623 );
611-
624+ TransportActionProxy . registerProxyAction ( transportService , NODE_SEARCH_ACTION_NAME , true , NodeQueryResponse :: new );
612625 }
613626
614627 private static void maybeRelease (SearchService searchService , NodeQueryRequest request , SearchPhaseResult result ) {
@@ -677,7 +690,7 @@ protected void doRun() {
677690 state .trackTotalHitsUpTo ,
678691 buildShardSearchRequest (
679692 shardId ,
680- searchRequest . getLocalClusterAlias () ,
693+ request . localClusterAlias ,
681694 shardToQuery .shardIndex ,
682695 shardToQuery .contextId ,
683696 shardToQuery .originalIndices ,
@@ -697,7 +710,7 @@ public void onResponse(SearchPhaseResult searchPhaseResult) {
697710 try {
698711 searchPhaseResult .setShardIndex (dataNodeLocalIdx );
699712 searchPhaseResult .setSearchShardTarget (
700- new SearchShardTarget (null , shardToQuery .shardId , request .searchRequest . getLocalClusterAlias () )
713+ new SearchShardTarget (null , shardToQuery .shardId , request .localClusterAlias )
701714 );
702715 // no need for any cache effects when we're already flipped to ture => plain read + set-release
703716 state .hasResponse .compareAndExchangeRelease (false , true );
0 commit comments