@@ -776,6 +776,7 @@ public void relocated(
776776 final String targetNodeId ,
777777 final String targetAllocationId ,
778778 final BiConsumer <ReplicationTracker .PrimaryContext , ActionListener <Void >> consumer ,
779+ // TODO primary permits if already held to release after primary context
779780 final ActionListener <Void > listener
780781 ) throws IllegalIndexShardStateException , IllegalStateException {
781782 assert shardRouting .primary () : "only primaries can be marked as relocated: " + shardRouting ;
@@ -1827,6 +1828,23 @@ public void postRecovery(String reason, ActionListener<Void> listener) throws In
18271828 }
18281829 }
18291830
1831+ /**
1832+ * Resets the engine, by closing the current engine reference and creating a new engine.
1833+ */
1834+ public void resetEngine () throws IOException {
1835+ assert indexShardOperationPermits .getActiveOperationsCount () == OPERATIONS_BLOCKED : "must hold permits to reset the engine" ;
1836+ final EngineConfig config = newEngineConfig (replicationTracker );
1837+ synchronized (engineMutex ) {
1838+ IOUtils .close (currentEngineReference .get ());
1839+ final Engine newEngine = createEngine (config );
1840+ currentEngineReference .set (newEngine );
1841+ onNewEngine (newEngine );
1842+ active .set (true );
1843+ }
1844+ onSettingsChanged ();
1845+ checkAndCallWaitForEngineOrClosedShardListeners ();
1846+ }
1847+
18301848 /**
18311849 * called before starting to copy index files over
18321850 */
@@ -3553,6 +3571,7 @@ public void acquirePrimaryOperationPermit(
35533571 ) {
35543572 verifyNotClosed ();
35553573 assert shardRouting .primary () : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting ;
3574+ indexEventListener .onPrimaryPermitAcquire (this );
35563575 indexShardOperationPermits .acquire (wrapPrimaryOperationPermitListener (onPermitAcquired ), executorOnDelay , forceExecution );
35573576 }
35583577
@@ -3581,25 +3600,32 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
35813600 */
35823601 private ActionListener <Releasable > wrapPrimaryOperationPermitListener (final ActionListener <Releasable > listener ) {
35833602 return listener .delegateFailure ((l , r ) -> {
3584- if (isPrimaryMode ()) {
3585- l .onResponse (r );
3603+ var wrappedReleasable = wrapPrimaryPermitReleasable (r );
3604+ if (isPrimaryMode () || state () == IndexShardState .POST_RECOVERY ) {
3605+ l .onResponse (wrappedReleasable );
35863606 } else {
3587- r .close ();
3607+ wrappedReleasable .close ();
35883608 l .onFailure (new ShardNotInPrimaryModeException (shardId , state ));
35893609 }
35903610 });
35913611 }
35923612
3613+ private Releasable wrapPrimaryPermitReleasable (Releasable releasable ) {
3614+ return Releasables .wrap (releasable , () -> { indexEventListener .onPrimaryPermitReleased (this ); });
3615+ }
3616+
35933617 private void asyncBlockOperations (ActionListener <Releasable > onPermitAcquired , long timeout , TimeUnit timeUnit ) {
35943618 final Releasable forceRefreshes = refreshListeners .forceRefreshes ();
35953619 final ActionListener <Releasable > wrappedListener = ActionListener .wrap (r -> {
3620+ var wrappedReleasable = wrapPrimaryPermitReleasable (r );
35963621 forceRefreshes .close ();
3597- onPermitAcquired .onResponse (r );
3622+ onPermitAcquired .onResponse (wrappedReleasable );
35983623 }, e -> {
35993624 forceRefreshes .close ();
36003625 onPermitAcquired .onFailure (e );
36013626 });
36023627 try {
3628+ indexEventListener .onPrimaryPermitAcquire (this );
36033629 indexShardOperationPermits .blockOperations (wrappedListener , timeout , timeUnit , threadPool .generic ());
36043630 } catch (Exception e ) {
36053631 forceRefreshes .close ();
@@ -3637,6 +3663,7 @@ private <E extends Exception> void bumpPrimaryTerm(
36373663 assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null );
36383664 assert getOperationPrimaryTerm () <= pendingPrimaryTerm ;
36393665 final CountDownLatch termUpdated = new CountDownLatch (1 );
3666+ indexEventListener .onPrimaryPermitAcquire (this );
36403667 asyncBlockOperations (new ActionListener <Releasable >() {
36413668 @ Override
36423669 public void onFailure (final Exception e ) {
@@ -3659,7 +3686,7 @@ private void innerFail(final Exception e) {
36593686
36603687 @ Override
36613688 public void onResponse (final Releasable releasable ) {
3662- final Releasable releaseOnce = Releasables .releaseOnce (releasable );
3689+ final Releasable releaseOnce = Releasables .releaseOnce (wrapPrimaryPermitReleasable ( releasable ) );
36633690 try {
36643691 assert getOperationPrimaryTerm () <= pendingPrimaryTerm ;
36653692 termUpdated .await ();
0 commit comments