@@ -512,73 +512,75 @@ public void handleException(TransportException exp) {
512512 );
513513 } else if (reshardSplitShardCountSummary == 0
514514 || reshardSplitShardCountSummary != indexMetadata .getReshardSplitShardCountSummaryForIndexing (
515- primaryRequest .getRequest ().shardId ().getId ())) {
516- // Split Request
517- Map <ShardId , Request > splitRequests = splitRequestOnPrimary (primaryRequest .getRequest ());
518- int numSplitRequests = splitRequests .size ();
519-
520- // splitRequestOnPrimary must handle the case when the request has no items
521- assert numSplitRequests > 0 : "expected atleast 1 split request" ;
522- assert numSplitRequests <= 2 : "number of split requests too many" ;
523-
524- // System.out.println("numSplitRequests = " + numSplitRequests);
525- // System.out.println("source shardId = " + primaryRequest.getRequest().shardId().toString());
526- if (numSplitRequests == 1 ) {
527- // System.out.println("shardId = " + splitRequests.entrySet().iterator().next().getKey().toString());
528- // If the request is for source, same behaviour as before
529- if (splitRequests .containsKey (primaryRequest .getRequest ().shardId ())) {
530- // System.out.println("Execute request on source");
531- executePrimaryRequest (primaryShardReference , "primary" );
532- // executePrimaryRequest(primaryShardReference, "primary_reshardSplit");
533- } else {
534- // System.out.println("Execute request on target");
535- // If the request is for target, forward request to target.
536- // TODO: Note that the request still contains the original shardId. We need to test if this will be a problem.
537- setPhase (replicationTask , "primary_reshardSplit_delegation" );
538- // If the request is for target, send request to target node
539- ShardId targetShardId = splitRequests .entrySet ().iterator ().next ().getKey ();
540- final IndexShard targetShard = getIndexShard (targetShardId );
541- final ShardRouting target = targetShard .routingEntry ();
542- final Writeable .Reader <Response > reader = TransportReplicationAction .this ::newResponseInstance ;
543- DiscoveryNode targetNode = clusterState .nodes ().get (target .currentNodeId ());
544- transportService .sendRequest (
545- targetNode ,
546- transportPrimaryAction ,
547- new ConcreteShardRequest <>(
548- primaryRequest .getRequest (),
549- target .allocationId ().getRelocationId (),
550- primaryRequest .getPrimaryTerm ()
551- ),
552- transportOptions ,
553- new ActionListenerResponseHandler <>(
554- onCompletionListener ,
555- reader ,
556- TransportResponseHandler .TRANSPORT_WORKER
557- ) {
558-
559- @ Override
560- public void handleResponse (Response response ) {
561- setPhase (replicationTask , "finished" );
562- super .handleResponse (response );
515+ primaryRequest .getRequest ().shardId ().getId ()
516+ )) {
517+ // Split Request
518+ Map <ShardId , Request > splitRequests = splitRequestOnPrimary (primaryRequest .getRequest ());
519+ int numSplitRequests = splitRequests .size ();
520+
521+ // splitRequestOnPrimary must handle the case when the request has no items
522+ assert numSplitRequests > 0 : "expected atleast 1 split request" ;
523+ assert numSplitRequests <= 2 : "number of split requests too many" ;
524+
525+ // System.out.println("numSplitRequests = " + numSplitRequests);
526+ // System.out.println("source shardId = " + primaryRequest.getRequest().shardId().toString());
527+ if (numSplitRequests == 1 ) {
528+ // System.out.println("shardId = " + splitRequests.entrySet().iterator().next().getKey().toString());
529+ // If the request is for source, same behaviour as before
530+ if (splitRequests .containsKey (primaryRequest .getRequest ().shardId ())) {
531+ // System.out.println("Execute request on source");
532+ executePrimaryRequest (primaryShardReference , "primary" );
533+ // executePrimaryRequest(primaryShardReference, "primary_reshardSplit");
534+ } else {
535+ // System.out.println("Execute request on target");
536+ // If the request is for target, forward request to target.
537+ // TODO: Note that the request still contains the original shardId. We need to test if this will be a
538+ // problem.
539+ setPhase (replicationTask , "primary_reshardSplit_delegation" );
540+ // If the request is for target, send request to target node
541+ ShardId targetShardId = splitRequests .entrySet ().iterator ().next ().getKey ();
542+ final IndexShard targetShard = getIndexShard (targetShardId );
543+ final ShardRouting target = targetShard .routingEntry ();
544+ final Writeable .Reader <Response > reader = TransportReplicationAction .this ::newResponseInstance ;
545+ DiscoveryNode targetNode = clusterState .nodes ().get (target .currentNodeId ());
546+ transportService .sendRequest (
547+ targetNode ,
548+ transportPrimaryAction ,
549+ new ConcreteShardRequest <>(
550+ primaryRequest .getRequest (),
551+ target .allocationId ().getRelocationId (),
552+ primaryRequest .getPrimaryTerm ()
553+ ),
554+ transportOptions ,
555+ new ActionListenerResponseHandler <>(
556+ onCompletionListener ,
557+ reader ,
558+ TransportResponseHandler .TRANSPORT_WORKER
559+ ) {
560+
561+ @ Override
562+ public void handleResponse (Response response ) {
563+ setPhase (replicationTask , "finished" );
564+ super .handleResponse (response );
565+ }
566+
567+ @ Override
568+ public void handleException (TransportException exp ) {
569+ setPhase (replicationTask , "finished" );
570+ super .handleException (exp );
571+ }
563572 }
573+ );
574+ }
575+ } else {
576+ // We have requests for both source and target shards.
577+ // Use a refcounted listener to run both requests async in parallel and collect the responses from both requests
564578
565- @ Override
566- public void handleException (TransportException exp ) {
567- setPhase (replicationTask , "finished" );
568- super .handleException (exp );
569- }
570- }
571- );
579+ // Merge responses from source and target before calling onCompletionListener
572580 }
573581 } else {
574- // We have requests for both source and target shards.
575- // Use a refcounted listener to run both requests async in parallel and collect the responses from both requests
576-
577- // Merge responses from source and target before calling onCompletionListener
582+ executePrimaryRequest (primaryShardReference , "primary" );
578583 }
579- } else {
580- executePrimaryRequest (primaryShardReference , "primary" );
581- }
582584 } catch (Exception e ) {
583585 handleException (primaryShardReference , e );
584586 }
0 commit comments