1616import org .elasticsearch .action .ActionListenerResponseHandler ;
1717import org .elasticsearch .action .ActionResponse ;
1818import org .elasticsearch .action .UnavailableShardsException ;
19+ import org .elasticsearch .action .bulk .BulkShardRequest ;
20+ import org .elasticsearch .action .bulk .BulkShardResponse ;
1921import org .elasticsearch .action .support .ActionFilters ;
2022import org .elasticsearch .action .support .ActiveShardCount ;
2123import org .elasticsearch .action .support .ChannelActionListener ;
24+ import org .elasticsearch .action .support .CountDownActionListener ;
25+ import org .elasticsearch .action .support .GroupedActionListener ;
2226import org .elasticsearch .action .support .TransportAction ;
2327import org .elasticsearch .action .support .TransportActions ;
2428import org .elasticsearch .client .internal .transport .NoNodeAvailableException ;
4246import org .elasticsearch .common .settings .Setting ;
4347import org .elasticsearch .common .settings .Settings ;
4448import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
49+ import org .elasticsearch .common .util .concurrent .CountDown ;
4550import org .elasticsearch .common .util .concurrent .EsExecutors ;
4651import org .elasticsearch .core .Assertions ;
4752import org .elasticsearch .core .Nullable ;
4853import org .elasticsearch .core .Releasable ;
4954import org .elasticsearch .core .Releasables ;
5055import org .elasticsearch .core .TimeValue ;
56+ import org .elasticsearch .core .Tuple ;
5157import org .elasticsearch .index .Index ;
5258import org .elasticsearch .index .IndexNotFoundException ;
5359import org .elasticsearch .index .IndexService ;
7581import org .elasticsearch .transport .TransportService ;
7682
7783import java .io .IOException ;
84+ import java .util .ArrayList ;
85+ import java .util .Collections ;
7886import java .util .Map ;
7987import java .util .Objects ;
8088import java .util .Optional ;
89+ import java .util .concurrent .ConcurrentHashMap ;
8190import java .util .concurrent .Executor ;
8291import java .util .concurrent .atomic .AtomicBoolean ;
92+ import java .util .concurrent .atomic .AtomicReference ;
93+ import java .util .concurrent .atomic .AtomicReferenceArray ;
8394
8495import static org .elasticsearch .core .Strings .format ;
8596
@@ -324,6 +335,15 @@ protected Map<ShardId, Request> splitRequestOnPrimary(Request request) {
324335 return Map .of (request .shardId (), request );
325336 }
326337
338+ protected Tuple <Response , Exception > combineSplitResponses (
339+ Request originalRequest ,
340+ Map <ShardId , Request > splitRequests ,
341+ Map <ShardId , Tuple <Response , Exception >> responses
342+ ) {
343+ assert responses .size () == 1 ;
344+ return responses .entrySet ().iterator ().next ().getValue ();
345+ }
346+
327347 /**
328348 * Cluster level block to check before request execution. Returning null means that no blocks need to be checked.
329349 */
@@ -490,107 +510,125 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
490510 // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
491511 final ShardRouting primary = primaryShardReference .routingEntry ();
492512 assert primary .relocating () : "indexShard is marked as relocated but routing isn't" + primary ;
493- final Writeable .Reader <Response > reader = TransportReplicationAction .this ::newResponseInstance ;
494513 DiscoveryNode relocatingNode = clusterState .nodes ().get (primary .relocatingNodeId ());
495- transportService .sendRequest (
496- relocatingNode ,
497- transportPrimaryAction ,
498- new ConcreteShardRequest <>(
499- primaryRequest .getRequest (),
500- primary .allocationId ().getRelocationId (),
501- primaryRequest .getPrimaryTerm ()
502- ),
503- transportOptions ,
504- new ActionListenerResponseHandler <>(onCompletionListener , reader , TransportResponseHandler .TRANSPORT_WORKER ) {
505- @ Override
506- public void handleResponse (Response response ) {
507- setPhase (replicationTask , "finished" );
508- super .handleResponse (response );
509- }
510-
511- @ Override
512- public void handleException (TransportException exp ) {
513- setPhase (replicationTask , "finished" );
514- super .handleException (exp );
515- }
516- }
517- );
514+ delegate (relocatingNode , primary .allocationId ().getRelocationId (), onCompletionListener );
518515 } else if (reshardSplitShardCountSummary .isUnset ()
519516 || reshardSplitShardCountSummary .equals (
520517 SplitShardCountSummary .forIndexing (indexMetadata , primaryRequest .getRequest ().shardId ().getId ())
521518 ) == false ) {
522519 // Split Request
523- Map <ShardId , Request > splitRequests = splitRequestOnPrimary (primaryRequest .getRequest ());
520+ Map <ShardId , Request > splitRequests = Collections .unmodifiableMap (
521+ splitRequestOnPrimary (primaryRequest .getRequest ())
522+ );
524523 int numSplitRequests = splitRequests .size ();
525524
526525 // splitRequestOnPrimary must handle the case when the request has no items
527- assert numSplitRequests > 0 : "expected atleast 1 split request" ;
526+ assert numSplitRequests > 0 : "expected at-least 1 split request" ;
528527 assert numSplitRequests <= 2 : "number of split requests too many" ;
529528
530529 if (numSplitRequests == 1 ) {
531- // System.out.println("shardId = " + splitRequests.entrySet().iterator().next().getKey().toString());
532530 // If the request is for source, same behaviour as before
533531 if (splitRequests .containsKey (primaryRequest .getRequest ().shardId ())) {
534- // System.out.println("Execute request on source");
535- executePrimaryRequest (primaryShardReference , "primary" );
536- // executePrimaryRequest(primaryShardReference, "primary_reshardSplit");
532+ executePrimaryRequest (primaryShardReference , "primary" , onCompletionListener );
537533 } else {
538- // System.out.println("Execute request on target");
539534 // If the request is for target, forward request to target.
540535 // TODO: Note that the request still contains the original shardId. We need to test if this will be a
541536 // problem.
542- setPhase (replicationTask , "primary_reshardSplit_delegation " );
537+ setPhase (replicationTask , "primary_reshard_target_delegation " );
543538 // If the request is for target, send request to target node
544539 ShardId targetShardId = splitRequests .entrySet ().iterator ().next ().getKey ();
545540 final IndexShard targetShard = getIndexShard (targetShardId );
546541 final ShardRouting target = targetShard .routingEntry ();
547- final Writeable .Reader <Response > reader = TransportReplicationAction .this ::newResponseInstance ;
548542 DiscoveryNode targetNode = clusterState .nodes ().get (target .currentNodeId ());
549- transportService .sendRequest (
550- targetNode ,
551- transportPrimaryAction ,
552- new ConcreteShardRequest <>(
553- primaryRequest .getRequest (),
554- target .allocationId ().getRelocationId (),
555- primaryRequest .getPrimaryTerm ()
556- ),
557- transportOptions ,
558- new ActionListenerResponseHandler <>(
559- onCompletionListener ,
560- reader ,
561- TransportResponseHandler .TRANSPORT_WORKER
562- ) {
563-
564- @ Override
565- public void handleResponse (Response response ) {
566- setPhase (replicationTask , "finished" );
567- super .handleResponse (response );
543+ String allocationID = target .allocationId ().getId ();
544+ delegate (targetNode , allocationID , onCompletionListener );
545+ }
546+ } else {
547+ Map <ShardId , Tuple <Response , Exception >> results = new ConcurrentHashMap <>(splitRequests .size ());
548+ CountDown countDown = new CountDown (splitRequests .size ());
549+ for (Map .Entry <ShardId , Request > splitRequest : splitRequests .entrySet ()) {
550+ ActionListener <Response > listener = new ActionListener <>() {
551+ @ Override
552+ public void onResponse (Response response ) {
553+ results .put (splitRequest .getKey (), new Tuple <>(response , null ));
554+ if (countDown .countDown ()) {
555+ finish ();
568556 }
557+ }
569558
570- @ Override
571- public void handleException (TransportException exp ) {
572- setPhase (replicationTask , "finished" );
573- super .handleException (exp );
559+ @ Override
560+ public void onFailure (Exception e ) {
561+ results .put (splitRequest .getKey (), new Tuple <>(null , e ));
562+ if (countDown .countDown ()) {
563+ finish ();
574564 }
575565 }
576- );
577- }
578- } else {
579- // TODO:
580- // We have requests for both source and target shards.
581- // Use a refcounted listener to run both requests async in parallel and collect the responses from both requests
582566
583- // Merge responses from source and target before calling onCompletionListener
567+ private void finish () {
568+ Tuple <Response , Exception > finalResponse = combineSplitResponses (
569+ primaryRequest .getRequest (),
570+ splitRequests ,
571+ results
572+ );
573+ if (finalResponse .v1 () != null ) {
574+ onCompletionListener .onResponse (finalResponse .v1 ());
575+ } else {
576+ onCompletionListener .onFailure (finalResponse .v2 ());
577+ }
578+ }
579+ };
580+ if (splitRequest .getKey ().equals (primaryRequest .getRequest ().shardId ())) {
581+ executePrimaryRequest (primaryShardReference , "primary" , listener );
582+ } else {
583+ final IndexShard targetShard = getIndexShard (splitRequest .getKey ());
584+ final ShardRouting target = targetShard .routingEntry ();
585+ DiscoveryNode targetNode = clusterState .nodes ().get (target .currentNodeId ());
586+ String allocationID = target .allocationId ().getId ();
587+ delegate (targetNode , allocationID , listener );
588+ }
589+ }
584590 }
585591 } else {
586- executePrimaryRequest (primaryShardReference , "primary" );
592+ executePrimaryRequest (primaryShardReference , "primary" , onCompletionListener );
587593 }
588594 } catch (Exception e ) {
589- handleException (primaryShardReference , e );
595+ Releasables .closeWhileHandlingException (primaryShardReference );
596+ onFailure (e );
590597 }
591598 }
592599
593- private void executePrimaryRequest (final PrimaryShardReference primaryShardReference , String phase ) throws Exception {
600+ private void delegate (DiscoveryNode targetNode , String allocationID , ActionListener <Response > listener ) {
601+ transportService .sendRequest (
602+ targetNode ,
603+ transportPrimaryAction ,
604+ new ConcreteShardRequest <>(primaryRequest .getRequest (), allocationID , primaryRequest .getPrimaryTerm ()),
605+ transportOptions ,
606+ new ActionListenerResponseHandler <>(
607+ listener ,
608+ TransportReplicationAction .this ::newResponseInstance ,
609+ TransportResponseHandler .TRANSPORT_WORKER
610+ ) {
611+
612+ @ Override
613+ public void handleResponse (Response response ) {
614+ setPhase (replicationTask , "finished" );
615+ super .handleResponse (response );
616+ }
617+
618+ @ Override
619+ public void handleException (TransportException exp ) {
620+ setPhase (replicationTask , "finished" );
621+ super .handleException (exp );
622+ }
623+ }
624+ );
625+ }
626+
627+ private void executePrimaryRequest (
628+ final PrimaryShardReference primaryShardReference ,
629+ String phase ,
630+ final ActionListener <Response > listener
631+ ) throws Exception {
594632 setPhase (replicationTask , phase );
595633
596634 final ActionListener <Response > responseListener = ActionListener .wrap (response -> {
@@ -617,7 +655,7 @@ private void executePrimaryRequest(final PrimaryShardReference primaryShardRefer
617655 assert primaryShardReference .indexShard .isPrimaryMode ();
618656 primaryShardReference .close (); // release shard operation lock before responding to caller
619657 setPhase (replicationTask , "finished" );
620- onCompletionListener .onResponse (response );
658+ listener .onResponse (response );
621659 }, e -> handleException (primaryShardReference , e ));
622660
623661 new ReplicationOperation <>(
0 commit comments