Skip to content

Commit 591fa87

Browse files
authored
Revive read/write engine lock to guard operations against resets (#126311)
This change re-introduces the engine read/write lock to guard against engine resets. It differs from #124635 on the following: uses the engineMutex for creating/closing engines uses the reentrant r/w lock for retaining engine instances and for resetting the engine acquires the reentrant read lock during refreshes to prevent deadlocks during resets add tests to ensure no deadlock when re-acquiring read lock in refresh listeners Relates ES-11447
1 parent edfb17e commit 591fa87

File tree

12 files changed

+849
-164
lines changed

12 files changed

+849
-164
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,7 @@ public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Excepti
676676
final CountDownLatch engineResetLatch = new CountDownLatch(1);
677677
shard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> {
678678
try {
679-
shard.resetEngineToGlobalCheckpoint();
679+
shard.rollbackEngineToGlobalCheckpoint();
680680
} finally {
681681
r.close();
682682
engineResetLatch.countDown();

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
8484
config.getRelativeTimeInNanosSupplier(),
8585
config.getIndexCommitListener(),
8686
config.isPromotableToPrimary(),
87-
config.getMapperService()
87+
config.getMapperService(),
88+
config.getEngineResetLock()
8889
);
8990
}
9091

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.common.util.concurrent.ReleasableLock;
5353
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
5454
import org.elasticsearch.core.AbstractRefCounted;
55+
import org.elasticsearch.core.Assertions;
5556
import org.elasticsearch.core.CheckedRunnable;
5657
import org.elasticsearch.core.Nullable;
5758
import org.elasticsearch.core.RefCounted;
@@ -76,6 +77,7 @@
7677
import org.elasticsearch.index.seqno.SequenceNumbers;
7778
import org.elasticsearch.index.shard.DenseVectorStats;
7879
import org.elasticsearch.index.shard.DocsStats;
80+
import org.elasticsearch.index.shard.EngineResetLock;
7981
import org.elasticsearch.index.shard.IndexShard;
8082
import org.elasticsearch.index.shard.ShardFieldStats;
8183
import org.elasticsearch.index.shard.ShardId;
@@ -1288,7 +1290,7 @@ public void externalRefresh(String source, ActionListener<Engine.RefreshResult>
12881290

12891291
/**
12901292
* Asynchronously refreshes the engine for new search operations to reflect the latest
1291-
* changes unless another thread is already refreshing the engine concurrently.
1293+
* changes unless another thread is already refreshing or reseting the engine concurrently.
12921294
*/
12931295
@Nullable
12941296
public abstract void maybeRefresh(String source, ActionListener<RefreshResult> listener) throws EngineException;
@@ -2371,7 +2373,7 @@ public record FlushResult(boolean flushPerformed, long generation) {
23712373
}
23722374

23732375
/**
2374-
* Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine()}.
2376+
* Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine(Consumer<Engine>)}.
23752377
*
23762378
* In general, resetting the engine should be done with care, to consider any
23772379
* in-progress operations and listeners (e.g., primary term and generation listeners).
@@ -2384,4 +2386,36 @@ public void prepareForEngineReset() throws IOException {
23842386
public long getLastUnsafeSegmentGenerationForGets() {
23852387
throw new UnsupportedOperationException("Doesn't support getting the latest segment generation");
23862388
}
2389+
2390+
protected static <R extends ReferenceManager<ElasticsearchDirectoryReader>> R wrapForAssertions(
2391+
R referenceManager,
2392+
EngineConfig engineConfig
2393+
) {
2394+
if (Assertions.ENABLED) {
2395+
referenceManager.addListener(new AssertRefreshListenerHoldsEngineReadLock(engineConfig.getEngineResetLock()));
2396+
}
2397+
return referenceManager;
2398+
}
2399+
2400+
/**
2401+
* RefreshListener that asserts that the engine read lock is held by the thread refreshing the reference.
2402+
*/
2403+
private static class AssertRefreshListenerHoldsEngineReadLock implements ReferenceManager.RefreshListener {
2404+
2405+
private final EngineResetLock engineLock;
2406+
2407+
private AssertRefreshListenerHoldsEngineReadLock(EngineResetLock engineLock) {
2408+
this.engineLock = Objects.requireNonNull(engineLock);
2409+
}
2410+
2411+
@Override
2412+
public void beforeRefresh() throws IOException {
2413+
assert engineLock.isReadLockedByCurrentThread() : Thread.currentThread();
2414+
}
2415+
2416+
@Override
2417+
public void afterRefresh(boolean didRefresh) throws IOException {
2418+
assert engineLock.isReadLockedByCurrentThread() : Thread.currentThread();
2419+
}
2420+
}
23872421
}

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.index.codec.CodecService;
3030
import org.elasticsearch.index.mapper.MapperService;
3131
import org.elasticsearch.index.seqno.RetentionLeases;
32+
import org.elasticsearch.index.shard.EngineResetLock;
3233
import org.elasticsearch.index.shard.ShardId;
3334
import org.elasticsearch.index.store.Store;
3435
import org.elasticsearch.index.translog.TranslogConfig;
@@ -146,6 +147,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
146147

147148
private final boolean promotableToPrimary;
148149

150+
private final EngineResetLock engineResetLock;
151+
149152
/**
150153
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
151154
*/
@@ -177,7 +180,8 @@ public EngineConfig(
177180
LongSupplier relativeTimeInNanosSupplier,
178181
Engine.IndexCommitListener indexCommitListener,
179182
boolean promotableToPrimary,
180-
MapperService mapperService
183+
MapperService mapperService,
184+
EngineResetLock engineResetLock
181185
) {
182186
this.shardId = shardId;
183187
this.indexSettings = indexSettings;
@@ -224,6 +228,7 @@ public EngineConfig(
224228
this.promotableToPrimary = promotableToPrimary;
225229
// always use compound on flush - reduces # of file-handles on refresh
226230
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
231+
this.engineResetLock = engineResetLock;
227232
}
228233

229234
/**
@@ -468,4 +473,8 @@ public boolean getUseCompoundFile() {
468473
public MapperService getMapperService() {
469474
return mapperService;
470475
}
476+
477+
public EngineResetLock getEngineResetLock() {
478+
return engineResetLock;
479+
}
471480
}

0 commit comments

Comments
 (0)