Skip to content

Commit 3a553b6

Browse files
committed
Unhollow on first ingestion
1 parent 47f796f commit 3a553b6

File tree

2 files changed

+49
-25
lines changed

2 files changed

+49
-25
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1800,12 +1800,26 @@ public void preRecovery(ActionListener<Void> listener) {
18001800
indexEventListener.beforeIndexShardRecovery(this, indexSettings, listener);
18011801
}
18021802

1803+
public volatile Releasable hollowPermits = null;
1804+
public AtomicBoolean unhollowing = new AtomicBoolean(false);
1805+
18031806
public void postRecovery(String reason, ActionListener<Void> listener) throws IndexShardStartedException, IndexShardRelocatedException,
18041807
IndexShardClosedException {
18051808
assert postRecoveryComplete == null;
18061809
SubscribableListener<Void> subscribableListener = new SubscribableListener<>();
18071810
postRecoveryComplete = subscribableListener;
18081811
final ActionListener<Void> finalListener = ActionListener.runBefore(listener, () -> subscribableListener.onResponse(null));
1812+
final ActionListener<Void> finalHollowListener = ActionListener.wrap(ignored -> {
1813+
if (shardRouting.primary()) {
1814+
this.asyncBlockOperations(ActionListener.wrap(r -> {
1815+
logger.warn("IRAKLIS got permits for shard " + shardId);
1816+
hollowPermits = r;
1817+
finalListener.onResponse(null);
1818+
}, finalListener::onFailure), TimeValue.timeValueMinutes(1L).duration(), TimeUnit.MINUTES);
1819+
} else {
1820+
finalListener.onResponse(null);
1821+
}
1822+
}, finalListener::onFailure);
18091823
try {
18101824
getEngine().refresh("post_recovery");
18111825
// we need to refresh again to expose all operations that were index until now. Otherwise
@@ -1821,10 +1835,40 @@ public void postRecovery(String reason, ActionListener<Void> listener) throws In
18211835
recoveryState.setStage(RecoveryState.Stage.DONE);
18221836
changeState(IndexShardState.POST_RECOVERY, reason);
18231837
}
1824-
indexEventListener.afterIndexShardRecovery(this, finalListener);
1838+
indexEventListener.afterIndexShardRecovery(this, finalHollowListener);
18251839
} catch (Exception e) {
1826-
finalListener.onFailure(e);
1840+
finalHollowListener.onFailure(e);
1841+
}
1842+
}
1843+
1844+
public void unhollow() {
1845+
if (hollowPermits != null && unhollowing.compareAndSet(false, true)) {
1846+
threadPool.generic().execute(() -> {
1847+
ActionListener.run(ActionListener.<Void>wrap(ignored -> {
1848+
final EngineConfig config = newEngineConfig(replicationTracker);
1849+
config.setEnableGcDeletes(false);
1850+
synchronized (engineMutex) {
1851+
IOUtils.close(currentEngineReference.get());
1852+
final Engine newEngine = createEngine(config);
1853+
currentEngineReference.set(newEngine);
1854+
onNewEngine(newEngine);
1855+
active.set(true);
1856+
}
1857+
onSettingsChanged();
1858+
checkAndCallWaitForEngineOrClosedShardListeners();
1859+
1860+
getEngine().skipTranslogRecovery();
1861+
getEngine().refresh("post_recovery");
1862+
indexEventListener.afterIndexShardRecovery(this, ActionListener.wrap(r -> {
1863+
hollowPermits.close();
1864+
logger.warn("IRAKLIS released permits for shard " + shardId);
1865+
hollowPermits = null;
1866+
unhollowing.set(false);
1867+
}, e -> { assert false : e; }));
1868+
}, e -> { assert false : e; }), l -> indexEventListener.beforeIndexShardRecovery(this, indexSettings, l));
1869+
});
18271870
}
1871+
;
18281872
}
18291873

18301874
/**
@@ -3553,6 +3597,7 @@ public void acquirePrimaryOperationPermit(
35533597
) {
35543598
verifyNotClosed();
35553599
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
3600+
unhollow();
35563601
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution);
35573602
}
35583603

@@ -3600,6 +3645,7 @@ private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, l
36003645
onPermitAcquired.onFailure(e);
36013646
});
36023647
try {
3648+
unhollow();
36033649
indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic());
36043650
} catch (Exception e) {
36053651
forceRefreshes.close();
@@ -4263,28 +4309,6 @@ public void afterRefresh(boolean didRefresh) {
42634309
}
42644310
}
42654311

4266-
public void myReset(ActionListener<Void> listener) throws IOException {
4267-
ActionListener.run(ActionListener.<Void>wrap(ignored -> {
4268-
ActionListener.run(listener, l -> {
4269-
final EngineConfig config = newEngineConfig(replicationTracker);
4270-
config.setEnableGcDeletes(false);
4271-
synchronized (engineMutex) {
4272-
IOUtils.close(currentEngineReference.get());
4273-
final Engine newEngine = createEngine(config);
4274-
currentEngineReference.set(newEngine);
4275-
onNewEngine(newEngine);
4276-
active.set(true);
4277-
}
4278-
onSettingsChanged();
4279-
checkAndCallWaitForEngineOrClosedShardListeners();
4280-
4281-
getEngine().skipTranslogRecovery();
4282-
getEngine().refresh("post_recovery");
4283-
indexEventListener.afterIndexShardRecovery(this, l);
4284-
});
4285-
}, e -> { listener.onFailure(e); }), l -> indexEventListener.beforeIndexShardRecovery(this, indexSettings, l));
4286-
}
4287-
42884312
/**
42894313
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
42904314
*/

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1336,7 +1336,7 @@ private void assertNoPendingIndexOperations() throws Exception {
13361336
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
13371337
for (IndexService indexService : indexServices) {
13381338
for (IndexShard indexShard : indexService) {
1339-
assertEquals(0, indexShard.getActiveOperationsCount());
1339+
assertEquals("shard " + indexShard.shardId() + " non-zero op count", 0, indexShard.getActiveOperationsCount());
13401340
}
13411341
}
13421342
}

0 commit comments

Comments
 (0)