@@ -317,6 +317,13 @@ protected abstract void shardOperationOnReplica(
317317 ActionListener <ReplicaResult > listener
318318 );
319319
320+ /**
321+ * During Resharding, we might need to split the primary request.
322+ */
323+ protected Map <ShardId , Request > splitRequestOnPrimary (Request request ) {
324+ return Map .of (request .shardId (), request );
325+ }
326+
320327 /**
321328 * Cluster level block to check before request execution. Returning null means that no blocks need to be checked.
322329 */
@@ -508,54 +515,127 @@ public void handleException(TransportException exp) {
508515 }
509516 }
510517 );
511- } else {
512- setPhase (replicationTask , "primary" );
513-
514- final ActionListener <Response > responseListener = ActionListener .wrap (response -> {
515- adaptResponse (response , primaryShardReference .indexShard );
516-
517- if (syncGlobalCheckpointAfterOperation ) {
518- try {
519- primaryShardReference .indexShard .maybeSyncGlobalCheckpoint ("post-operation" );
520- } catch (final Exception e ) {
521- // only log non-closed exceptions
522- if (ExceptionsHelper .unwrap (e , AlreadyClosedException .class , IndexShardClosedException .class ) == null ) {
523- // intentionally swallow, a missed global checkpoint sync should not fail this operation
524- logger .info (
525- () -> format (
526- "%s failed to execute post-operation global checkpoint sync" ,
527- primaryShardReference .indexShard .shardId ()
528- ),
529- e
530- );
531- }
518+ } else if (reshardSplitShardCountSummary .isUnset ()
519+ || reshardSplitShardCountSummary .equals (
520+ SplitShardCountSummary .forIndexing (indexMetadata , primaryRequest .getRequest ().shardId ().getId ())
521+ ) == false ) {
522+ // Split Request
523+ Map <ShardId , Request > splitRequests = splitRequestOnPrimary (primaryRequest .getRequest ());
524+ int numSplitRequests = splitRequests .size ();
525+
526+ // splitRequestOnPrimary must handle the case when the request has no items
527+ assert numSplitRequests > 0 : "expected atleast 1 split request" ;
528+ assert numSplitRequests <= 2 : "number of split requests too many" ;
529+
530+ // System.out.println("numSplitRequests = " + numSplitRequests);
531+ // System.out.println("source shardId = " + primaryRequest.getRequest().shardId().toString());
532+ if (numSplitRequests == 1 ) {
533+ // System.out.println("shardId = " + splitRequests.entrySet().iterator().next().getKey().toString());
534+ // If the request is for source, same behaviour as before
535+ if (splitRequests .containsKey (primaryRequest .getRequest ().shardId ())) {
536+ // System.out.println("Execute request on source");
537+ executePrimaryRequest (primaryShardReference , "primary" );
538+ // executePrimaryRequest(primaryShardReference, "primary_reshardSplit");
539+ } else {
540+ // System.out.println("Execute request on target");
541+ // If the request is for target, forward request to target.
542+ // TODO: Note that the request still contains the original shardId. We need to test if this will be a
543+ // problem.
544+ setPhase (replicationTask , "primary_reshardSplit_delegation" );
545+ // If the request is for target, send request to target node
546+ ShardId targetShardId = splitRequests .entrySet ().iterator ().next ().getKey ();
547+ final IndexShard targetShard = getIndexShard (targetShardId );
548+ final ShardRouting target = targetShard .routingEntry ();
549+ final Writeable .Reader <Response > reader = TransportReplicationAction .this ::newResponseInstance ;
550+ DiscoveryNode targetNode = clusterState .nodes ().get (target .currentNodeId ());
551+ transportService .sendRequest (
552+ targetNode ,
553+ transportPrimaryAction ,
554+ new ConcreteShardRequest <>(
555+ primaryRequest .getRequest (),
556+ target .allocationId ().getRelocationId (),
557+ primaryRequest .getPrimaryTerm ()
558+ ),
559+ transportOptions ,
560+ new ActionListenerResponseHandler <>(
561+ onCompletionListener ,
562+ reader ,
563+ TransportResponseHandler .TRANSPORT_WORKER
564+ ) {
565+
566+ @ Override
567+ public void handleResponse (Response response ) {
568+ setPhase (replicationTask , "finished" );
569+ super .handleResponse (response );
570+ }
571+
572+ @ Override
573+ public void handleException (TransportException exp ) {
574+ setPhase (replicationTask , "finished" );
575+ super .handleException (exp );
576+ }
577+ }
578+ );
532579 }
533- }
580+ } else {
581+ // TODO:
582+ // We have requests for both source and target shards.
583+ // Use a refcounted listener to run both requests async in parallel and collect the responses from both requests
534584
535- assert primaryShardReference .indexShard .isPrimaryMode ();
536- primaryShardReference .close (); // release shard operation lock before responding to caller
537- setPhase (replicationTask , "finished" );
538- onCompletionListener .onResponse (response );
539- }, e -> handleException (primaryShardReference , e ));
540-
541- new ReplicationOperation <>(
542- primaryRequest .getRequest (),
543- primaryShardReference ,
544- responseListener .map (result -> result .replicationResponse ),
545- newReplicasProxy (),
546- logger ,
547- threadPool ,
548- actionName ,
549- primaryRequest .getPrimaryTerm (),
550- initialRetryBackoffBound ,
551- retryTimeout
552- ).execute ();
553- }
585+ // Merge responses from source and target before calling onCompletionListener
586+ }
587+ } else {
588+ executePrimaryRequest (primaryShardReference , "primary" );
589+ }
554590 } catch (Exception e ) {
555591 handleException (primaryShardReference , e );
556592 }
557593 }
558594
595+ private void executePrimaryRequest (final PrimaryShardReference primaryShardReference , String phase ) throws Exception {
596+ setPhase (replicationTask , phase );
597+
598+ final ActionListener <Response > responseListener = ActionListener .wrap (response -> {
599+ adaptResponse (response , primaryShardReference .indexShard );
600+
601+ if (syncGlobalCheckpointAfterOperation ) {
602+ try {
603+ primaryShardReference .indexShard .maybeSyncGlobalCheckpoint ("post-operation" );
604+ } catch (final Exception e ) {
605+ // only log non-closed exceptions
606+ if (ExceptionsHelper .unwrap (e , AlreadyClosedException .class , IndexShardClosedException .class ) == null ) {
607+ // intentionally swallow, a missed global checkpoint sync should not fail this operation
608+ logger .info (
609+ () -> format (
610+ "%s failed to execute post-operation global checkpoint sync" ,
611+ primaryShardReference .indexShard .shardId ()
612+ ),
613+ e
614+ );
615+ }
616+ }
617+ }
618+
619+ assert primaryShardReference .indexShard .isPrimaryMode ();
620+ primaryShardReference .close (); // release shard operation lock before responding to caller
621+ setPhase (replicationTask , "finished" );
622+ onCompletionListener .onResponse (response );
623+ }, e -> handleException (primaryShardReference , e ));
624+
625+ new ReplicationOperation <>(
626+ primaryRequest .getRequest (),
627+ primaryShardReference ,
628+ responseListener .map (result -> result .replicationResponse ),
629+ newReplicasProxy (),
630+ logger ,
631+ threadPool ,
632+ actionName ,
633+ primaryRequest .getPrimaryTerm (),
634+ initialRetryBackoffBound ,
635+ retryTimeout
636+ ).execute ();
637+ }
638+
559639 private void handleException (PrimaryShardReference primaryShardReference , Exception e ) {
560640 Releasables .closeWhileHandlingException (primaryShardReference ); // release shard operation lock before responding to caller
561641 onFailure (e );
0 commit comments