Skip to content

Commit 39603ec

Browse files
authored
Reset engine for hollow shards (#120649)
Introduces a function in the IndexShard to reset the engine. By default, all Engine implementation will throw in core ES. In stateless, we will extend the prepareForEngineReset() function in order to make a hollow commit or an unhollow commit. Based on the commit, the subsequent new engine will be a hollow or unhollow indexing engine. The reset function takes care to close the engine, which waits for all operations to drain. This, along with the fact we will have blocked ingestion in stateless, and there should be no searches in the indexing tier, should ensure there are no unexpected asynchronous side-effects. Relates ES-10600
1 parent 5e662c5 commit 39603ec

File tree

3 files changed

+69
-1
lines changed

3 files changed

+69
-1
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.elasticsearch.index.seqno.SequenceNumbers;
7676
import org.elasticsearch.index.shard.DenseVectorStats;
7777
import org.elasticsearch.index.shard.DocsStats;
78+
import org.elasticsearch.index.shard.IndexShard;
7879
import org.elasticsearch.index.shard.ShardId;
7980
import org.elasticsearch.index.shard.ShardLongFieldRange;
8081
import org.elasticsearch.index.shard.SparseVectorStats;
@@ -2334,4 +2335,15 @@ public record FlushResult(boolean flushPerformed, long generation) {
23342335
public static final long UNKNOWN_GENERATION = -1L;
23352336
public static final FlushResult NO_FLUSH = new FlushResult(false, UNKNOWN_GENERATION);
23362337
}
2338+
2339+
/**
2340+
* Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine()}.
2341+
*
2342+
* In general, resetting the engine should be done with care, to consider any
2343+
* in-progress operations and listeners (e.g., primary term and generation listeners).
2344+
* At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset.
2345+
*/
2346+
public void prepareForEngineReset() throws IOException {
2347+
throw new UnsupportedOperationException("does not support engine reset");
2348+
}
23372349
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4307,6 +4307,35 @@ public void afterRefresh(boolean didRefresh) {
43074307
}
43084308
}
43094309

4310+
/**
4311+
* Reset the current engine to a new one.
4312+
*
4313+
* Calls {@link Engine#prepareForEngineReset()} on the current engine, then closes it, and loads a new engine without
4314+
* doing any translog recovery.
4315+
*
4316+
* In general, resetting the engine should be done with care, to consider any in-progress operations and listeners.
4317+
* At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset.
4318+
*/
4319+
public void resetEngine() {
4320+
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
4321+
assert waitForEngineOrClosedShardListeners.isDone();
4322+
try {
4323+
synchronized (engineMutex) {
4324+
final var currentEngine = getEngine();
4325+
currentEngine.prepareForEngineReset();
4326+
var engineConfig = newEngineConfig(replicationTracker);
4327+
verifyNotClosed();
4328+
IOUtils.close(currentEngine);
4329+
var newEngine = createEngine(engineConfig);
4330+
currentEngineReference.set(newEngine);
4331+
onNewEngine(newEngine);
4332+
}
4333+
onSettingsChanged();
4334+
} catch (Exception e) {
4335+
failShard("unable to reset engine", e);
4336+
}
4337+
}
4338+
43104339
/**
43114340
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
43124341
*/

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4497,7 +4497,7 @@ public void testSupplyTombstoneDoc() throws Exception {
44974497
closeShards(shard);
44984498
}
44994499

4500-
public void testResetEngine() throws Exception {
4500+
public void testResetEngineToGlobalCheckpoint() throws Exception {
45014501
IndexShard shard = newStartedShard(false);
45024502
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()));
45034503
long maxSeqNoBeforeRollback = shard.seqNoStats().getMaxSeqNo();
@@ -4559,6 +4559,33 @@ public void testResetEngine() throws Exception {
45594559
closeShard(shard, false);
45604560
}
45614561

4562+
public void testResetEngine() throws Exception {
4563+
var newEngineCreated = new CountDownLatch(2);
4564+
var indexShard = newStartedShard(true, Settings.EMPTY, config -> {
4565+
try {
4566+
return new ReadOnlyEngine(config, null, null, true, Function.identity(), true, true) {
4567+
@Override
4568+
public void prepareForEngineReset() throws IOException {
4569+
;
4570+
}
4571+
};
4572+
} finally {
4573+
newEngineCreated.countDown();
4574+
}
4575+
});
4576+
var newEngineNotification = new CountDownLatch(1);
4577+
indexShard.waitForEngineOrClosedShard(ActionListener.running(newEngineNotification::countDown));
4578+
4579+
var onAcquired = new PlainActionFuture<Releasable>();
4580+
indexShard.acquireAllPrimaryOperationsPermits(onAcquired, TimeValue.timeValueMinutes(1L));
4581+
try (var permits = safeGet(onAcquired)) {
4582+
indexShard.resetEngine();
4583+
}
4584+
safeAwait(newEngineCreated);
4585+
safeAwait(newEngineNotification);
4586+
closeShard(indexShard, false);
4587+
}
4588+
45624589
/**
45634590
* This test simulates a scenario seen rarely in ConcurrentSeqNoVersioningIT. Closing a shard while engine is inside
45644591
* resetEngineToGlobalCheckpoint can lead to check index failure in integration tests.

0 commit comments

Comments
 (0)