@@ -477,10 +477,6 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
477477 }
478478
479479 int reshardSplitShardCountSummary = primaryRequest .getRequest ().reshardSplitShardCountSummary ();
480- assert (reshardSplitShardCountSummary == 0
481- || reshardSplitShardCountSummary == indexMetadata .getReshardSplitShardCountSummaryForIndexing (
482- primaryRequest .getRequest ().shardId ().getId ()
483- ));
484480 if (primaryShardReference .isRelocated ()) {
485481 primaryShardReference .close (); // release shard operation lock as soon as possible
486482 setPhase (replicationTask , "primary_delegation" );
@@ -514,9 +510,9 @@ public void handleException(TransportException exp) {
514510 }
515511 }
516512 );
517- // Replace false with a Reshard shard count mismatch test
518- // (an abstract API that will be implemented by bulk, refresh and flush)
519- } else if ( true ) {
513+ } else if ( reshardSplitShardCountSummary == 0
514+ || reshardSplitShardCountSummary != indexMetadata . getReshardSplitShardCountSummaryForIndexing (
515+ primaryRequest . getRequest (). shardId (). getId ()) ) {
520516 // Split Request
521517 Map <ShardId , Request > splitRequests = splitRequestOnPrimary (primaryRequest .getRequest ());
522518 int numSplitRequests = splitRequests .size ();
@@ -576,57 +572,13 @@ public void handleException(TransportException exp) {
576572 }
577573 } else {
578574 // We have requests for both source and target shards.
579- // Use a refcounted listener to run both requests async in parallel
575+ // Use a refcounted listener to run both requests async in parallel and collect the responses from both requests
580576
581577 // Merge responses from source and target before calling onCompletionListener
582578 }
583579 } else {
584580 executePrimaryRequest (primaryShardReference , "primary" );
585581 }
586- /*
587- setPhase(replicationTask, "primary");
588-
589- final ActionListener<Response> responseListener = ActionListener.wrap(response -> {
590- adaptResponse(response, primaryShardReference.indexShard);
591-
592- if (syncGlobalCheckpointAfterOperation) {
593- try {
594- primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
595- } catch (final Exception e) {
596- // only log non-closed exceptions
597- if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
598- // intentionally swallow, a missed global checkpoint sync should not fail this operation
599- logger.info(
600- () -> format(
601- "%s failed to execute post-operation global checkpoint sync",
602- primaryShardReference.indexShard.shardId()
603- ),
604- e
605- );
606- }
607- }
608- }
609-
610- assert primaryShardReference.indexShard.isPrimaryMode();
611- primaryShardReference.close(); // release shard operation lock before responding to caller
612- setPhase(replicationTask, "finished");
613- onCompletionListener.onResponse(response);
614- }, e -> handleException(primaryShardReference, e));
615-
616- new ReplicationOperation<>(
617- primaryRequest.getRequest(),
618- primaryShardReference,
619- responseListener.map(result -> result.replicationResponse),
620- newReplicasProxy(),
621- logger,
622- threadPool,
623- actionName,
624- primaryRequest.getPrimaryTerm(),
625- initialRetryBackoffBound,
626- retryTimeout
627- ).execute();
628- }
629- */
630582 } catch (Exception e ) {
631583 handleException (primaryShardReference , e );
632584 }
0 commit comments