Skip to content

Commit cd8f24f

Browse files
committed
acquire read lock before refresh
1 parent b102065 commit cd8f24f

File tree

2 files changed

+54
-6
lines changed

2 files changed

+54
-6
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.common.util.concurrent.ReleasableLock;
5353
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
5454
import org.elasticsearch.core.AbstractRefCounted;
55+
import org.elasticsearch.core.Assertions;
5556
import org.elasticsearch.core.CheckedRunnable;
5657
import org.elasticsearch.core.Nullable;
5758
import org.elasticsearch.core.RefCounted;
@@ -2384,4 +2385,33 @@ public void prepareForEngineReset() throws IOException {
23842385
public long getLastUnsafeSegmentGenerationForGets() {
23852386
throw new UnsupportedOperationException("Doesn't support getting the latest segment generation");
23862387
}
2388+
2389+
protected static <R extends ReferenceManager<ElasticsearchDirectoryReader>> R wrapForAssertions(R referenceManager, EngineConfig engineConfig) {
2390+
if (Assertions.ENABLED) {
2391+
referenceManager.addListener(new AssertRefreshListenerHoldsEngineReadLock(engineConfig.getEngineLock()));
2392+
}
2393+
return referenceManager;
2394+
}
2395+
2396+
/**
2397+
* RefreshListener that asserts that the engine read lock is held by the thread refreshing the reference.
2398+
*/
2399+
private static class AssertRefreshListenerHoldsEngineReadLock implements ReferenceManager.RefreshListener {
2400+
2401+
private final EngineReadWriteLock engineLock;
2402+
2403+
private AssertRefreshListenerHoldsEngineReadLock(EngineReadWriteLock engineLock) {
2404+
this.engineLock = Objects.requireNonNull(engineLock);
2405+
}
2406+
2407+
@Override
2408+
public void beforeRefresh() throws IOException {
2409+
assert engineLock.isReadLockedByCurrentThread() : Thread.currentThread();
2410+
}
2411+
2412+
@Override
2413+
public void afterRefresh(boolean didRefresh) throws IOException {
2414+
assert engineLock.isReadLockedByCurrentThread() : Thread.currentThread();
2415+
}
2416+
}
23872417
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,8 @@ public InternalEngine(EngineConfig engineConfig) {
301301
}
302302
externalReaderManager = createReaderManager(new RefreshWarmerListener(logger, isClosed, engineConfig));
303303
internalReaderManager = externalReaderManager.internalReaderManager;
304-
this.internalReaderManager = internalReaderManager;
305-
this.externalReaderManager = externalReaderManager;
304+
this.internalReaderManager = wrapForAssertions(internalReaderManager, engineConfig);
305+
this.externalReaderManager = wrapForAssertions(externalReaderManager, engineConfig);
306306
internalReaderManager.addListener(versionMap);
307307
this.lastUnsafeSegmentGenerationForGets = new AtomicLong(lastCommittedSegmentInfos.getGeneration());
308308
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
@@ -2040,7 +2040,7 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea
20402040
// both refresh types will result in an internal refresh but only the external will also
20412041
// pass the new reader reference to the external reader manager.
20422042
final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
2043-
boolean refreshed;
2043+
boolean refreshed = false;
20442044
long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;
20452045
try {
20462046
// refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way.
@@ -2051,12 +2051,30 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea
20512051
// the second refresh will only do the extra work we have to do for warming caches etc.
20522052
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
20532053
long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();
2054+
2055+
// The shard uses a reentrant read/write lock to guard again engine changes, a type of lock that prioritizes the threads
2056+
// waiting for the write lock over the threads trying to acquire a (non-reentrant) read lock. Because refresh listeners
2057+
// are accessing the engine read lock, we need to ensure that they won't block if another thread is waiting for the
2058+
// engine write lock, so we acquire the read lock upfront before the refresh lock.
2059+
final var engineReadLock = engineConfig.getEngineLock().readLock();
2060+
20542061
// it is intentional that we never refresh both internal / external together
20552062
if (block) {
2056-
referenceManager.maybeRefreshBlocking();
2057-
refreshed = true;
2063+
engineReadLock.lock();
2064+
try {
2065+
referenceManager.maybeRefreshBlocking();
2066+
refreshed = true;
2067+
} finally {
2068+
engineReadLock.unlock();
2069+
}
20582070
} else {
2059-
refreshed = referenceManager.maybeRefresh();
2071+
if (engineReadLock.tryLock()) {
2072+
try {
2073+
refreshed = referenceManager.maybeRefresh();
2074+
} finally {
2075+
engineReadLock.unlock();
2076+
}
2077+
}
20602078
}
20612079
if (refreshed) {
20622080
final ElasticsearchDirectoryReader current = referenceManager.acquire();

0 commit comments

Comments
 (0)