Skip to content

Commit 23a5ff2

Browse files
committed
Use lock for resets only
1 parent cd8f24f commit 23a5ff2

File tree

12 files changed

+316
-248
lines changed

12 files changed

+316
-248
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,7 @@ public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Excepti
676676
final CountDownLatch engineResetLatch = new CountDownLatch(1);
677677
shard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> {
678678
try {
679-
shard.resetEngineToGlobalCheckpoint();
679+
shard.rollbackEngineToGlobalCheckpoint();
680680
} finally {
681681
r.close();
682682
engineResetLatch.countDown();

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
8585
config.getIndexCommitListener(),
8686
config.isPromotableToPrimary(),
8787
config.getMapperService(),
88-
config.getEngineLock()
88+
config.getEngineResetLock()
8989
);
9090
}
9191

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2386,9 +2386,12 @@ public long getLastUnsafeSegmentGenerationForGets() {
23862386
throw new UnsupportedOperationException("Doesn't support getting the latest segment generation");
23872387
}
23882388

2389-
protected static <R extends ReferenceManager<ElasticsearchDirectoryReader>> R wrapForAssertions(R referenceManager, EngineConfig engineConfig) {
2389+
protected static <R extends ReferenceManager<ElasticsearchDirectoryReader>> R wrapForAssertions(
2390+
R referenceManager,
2391+
EngineConfig engineConfig
2392+
) {
23902393
if (Assertions.ENABLED) {
2391-
referenceManager.addListener(new AssertRefreshListenerHoldsEngineReadLock(engineConfig.getEngineLock()));
2394+
referenceManager.addListener(new AssertRefreshListenerHoldsEngineReadLock(engineConfig.getEngineResetLock()));
23922395
}
23932396
return referenceManager;
23942397
}
@@ -2398,9 +2401,9 @@ protected static <R extends ReferenceManager<ElasticsearchDirectoryReader>> R wr
23982401
*/
23992402
private static class AssertRefreshListenerHoldsEngineReadLock implements ReferenceManager.RefreshListener {
24002403

2401-
private final EngineReadWriteLock engineLock;
2404+
private final EngineResetLock engineLock;
24022405

2403-
private AssertRefreshListenerHoldsEngineReadLock(EngineReadWriteLock engineLock) {
2406+
private AssertRefreshListenerHoldsEngineReadLock(EngineResetLock engineLock) {
24042407
this.engineLock = Objects.requireNonNull(engineLock);
24052408
}
24062409

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
146146

147147
private final boolean promotableToPrimary;
148148

149-
private final EngineReadWriteLock engineLock;
149+
private final EngineResetLock engineResetLock;
150150

151151
/**
152152
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
@@ -180,7 +180,7 @@ public EngineConfig(
180180
Engine.IndexCommitListener indexCommitListener,
181181
boolean promotableToPrimary,
182182
MapperService mapperService,
183-
EngineReadWriteLock engineLock
183+
EngineResetLock engineResetLock
184184
) {
185185
this.shardId = shardId;
186186
this.indexSettings = indexSettings;
@@ -227,7 +227,7 @@ public EngineConfig(
227227
this.promotableToPrimary = promotableToPrimary;
228228
// always use compound on flush - reduces # of file-handles on refresh
229229
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
230-
this.engineLock = engineLock;
230+
this.engineResetLock = engineResetLock;
231231
}
232232

233233
/**
@@ -473,7 +473,7 @@ public MapperService getMapperService() {
473473
return mapperService;
474474
}
475475

476-
public EngineReadWriteLock getEngineLock() {
477-
return engineLock;
476+
public EngineResetLock getEngineResetLock() {
477+
return engineResetLock;
478478
}
479479
}

server/src/main/java/org/elasticsearch/index/engine/EngineReadWriteLock.java renamed to server/src/main/java/org/elasticsearch/index/engine/EngineResetLock.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,25 @@
99

1010
package org.elasticsearch.index.engine;
1111

12+
import org.elasticsearch.core.Assertions;
13+
14+
import java.util.Collection;
15+
import java.util.List;
1216
import java.util.concurrent.locks.Lock;
1317
import java.util.concurrent.locks.ReadWriteLock;
1418
import java.util.concurrent.locks.ReentrantReadWriteLock;
1519

1620
/**
17-
* Reentrant read/write lock used to guard engine changes in a shard.
21+
* Reentrant read/write lock used to control accesses to a shard's engine that can be reset.
1822
*
19-
* Implemented as a simple wrapper around a {@link ReentrantReadWriteLock} to make it easier to add/override methods in the future.
23+
* Implemented as a simple wrapper around a {@link ReentrantReadWriteLock} to make it easier to add/override methods.
2024
*/
21-
public final class EngineReadWriteLock implements ReadWriteLock {
25+
public final class EngineResetLock implements ReadWriteLock {
2226

2327
private final ReentrantReadWriteLock lock;
2428

25-
public EngineReadWriteLock() {
26-
this.lock = new ReentrantReadWriteLock();
29+
public EngineResetLock() {
30+
this.lock = Assertions.ENABLED ? new QueuedWriterThreadsReentrantReadWriteLock() : new ReentrantReadWriteLock();
2731
}
2832

2933
@Override
@@ -69,4 +73,24 @@ public boolean isReadLocked() {
6973
public boolean isReadLockedByCurrentThread() {
7074
return lock.getReadHoldCount() > 0;
7175
}
76+
77+
/**
78+
* See {@link ReentrantReadWriteLock#getQueuedWriterThreads()}
79+
*/
80+
public Collection<Thread> getQueuedWriterThreads() {
81+
if (lock instanceof QueuedWriterThreadsReentrantReadWriteLock queuedLock) {
82+
return queuedLock.queuedWriterThreads();
83+
} else {
84+
return List.of();
85+
}
86+
}
87+
88+
/**
89+
* Extends ReentrantReadWriteLock to expose the protected {@link ReentrantReadWriteLock#getQueuedWriterThreads()} method
90+
*/
91+
private static class QueuedWriterThreadsReentrantReadWriteLock extends ReentrantReadWriteLock {
92+
Collection<Thread> queuedWriterThreads() {
93+
return super.getQueuedWriterThreads();
94+
}
95+
}
7296
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2056,7 +2056,7 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea
20562056
// waiting for the write lock over the threads trying to acquire a (non-reentrant) read lock. Because refresh listeners
20572057
// are accessing the engine read lock, we need to ensure that they won't block if another thread is waiting for the
20582058
// engine write lock, so we acquire the read lock upfront before the refresh lock.
2059-
final var engineReadLock = engineConfig.getEngineLock().readLock();
2059+
final var engineReadLock = engineConfig.getEngineResetLock().readLock();
20602060

20612061
// it is intentional that we never refresh both internal / external together
20622062
if (block) {

0 commit comments

Comments
 (0)