Skip to content

Commit 4547a47

Browse files
committed
Hold engine read lock during reader refresh
Relates elastic#124635
1 parent 1d6c6a5 commit 4547a47

File tree

11 files changed

+244
-53
lines changed

11 files changed

+244
-53
lines changed

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
8484
config.getRelativeTimeInNanosSupplier(),
8585
config.getIndexCommitListener(),
8686
config.isPromotableToPrimary(),
87-
config.getMapperService()
87+
config.getMapperService(),
88+
config.getMaybeRefreshLock()
8889
);
8990
}
9091

server/src/main/java/org/elasticsearch/index/IndexWarmer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public final class IndexWarmer {
4747
}
4848

4949
void warm(ElasticsearchDirectoryReader reader, IndexShard shard, IndexSettings settings) {
50-
if (shard.state() == IndexShardState.CLOSED) {
50+
if (shard.state() == IndexShardState.CLOSED || shard.isClosing()) {
5151
return;
5252
}
5353
if (settings.isWarmerEnabled() == false) {
@@ -117,6 +117,9 @@ public TerminationHandle warmReader(final IndexShard indexShard, final Elasticse
117117
for (final MappedFieldType fieldType : warmUpGlobalOrdinals.values()) {
118118
executor.execute(() -> {
119119
try {
120+
if (indexShard.isClosing()) {
121+
return;
122+
}
120123
final long start = System.nanoTime();
121124
IndexFieldData.Global<?> ifd = indexFieldDataService.getForField(
122125
fieldType,

server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ public IndexWarmer.TerminationHandle warmReader(final IndexShard indexShard, fin
272272
return TerminationHandle.NO_WAIT;
273273
}
274274

275-
if (loadRandomAccessFiltersEagerly == false) {
275+
if (loadRandomAccessFiltersEagerly == false || indexShard.isClosing()) {
276276
return TerminationHandle.NO_WAIT;
277277
}
278278

@@ -291,7 +291,9 @@ public IndexWarmer.TerminationHandle warmReader(final IndexShard indexShard, fin
291291
executor.execute(() -> {
292292
try {
293293
final long start = System.nanoTime();
294-
getAndLoadIfNotPresent(filterToWarm, ctx);
294+
if (indexShard.isClosing() == false) {
295+
getAndLoadIfNotPresent(filterToWarm, ctx);
296+
}
295297
if (indexShard.warmerService().logger().isTraceEnabled()) {
296298
indexShard.warmerService()
297299
.logger()

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.Comparator;
4141
import java.util.List;
4242
import java.util.Objects;
43+
import java.util.concurrent.locks.Lock;
4344
import java.util.function.LongSupplier;
4445
import java.util.function.Supplier;
4546

@@ -146,6 +147,11 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
146147

147148
private final boolean promotableToPrimary;
148149

150+
/**
151+
* Lock to acquire before executing refreshing a reader instance.
152+
*/
153+
private final Lock maybeRefreshLock;
154+
149155
/**
150156
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
151157
*/
@@ -177,7 +183,8 @@ public EngineConfig(
177183
LongSupplier relativeTimeInNanosSupplier,
178184
Engine.IndexCommitListener indexCommitListener,
179185
boolean promotableToPrimary,
180-
MapperService mapperService
186+
MapperService mapperService,
187+
Lock maybeRefreshLock
181188
) {
182189
this.shardId = shardId;
183190
this.indexSettings = indexSettings;
@@ -224,6 +231,7 @@ public EngineConfig(
224231
this.promotableToPrimary = promotableToPrimary;
225232
// always use compound on flush - reduces # of file-handles on refresh
226233
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
234+
this.maybeRefreshLock = maybeRefreshLock;
227235
}
228236

229237
/**
@@ -468,4 +476,8 @@ public boolean getUseCompoundFile() {
468476
public MapperService getMapperService() {
469477
return mapperService;
470478
}
479+
480+
public Lock getMaybeRefreshLock() {
481+
return maybeRefreshLock;
482+
}
471483
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2051,12 +2051,22 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea
20512051
// the second refresh will only do the extra work we have to do for warming caches etc.
20522052
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
20532053
long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();
2054-
// it is intentional that we never refresh both internal / external together
2055-
if (block) {
2056-
referenceManager.maybeRefreshBlocking();
2057-
refreshed = true;
2058-
} else {
2059-
refreshed = referenceManager.maybeRefresh();
2054+
2055+
// acquire the engine read lock before trying to acquire the reader reference manager's refresh lock, so that the
2056+
// refresh is guaranteed to complete if it tries accessing the engine for refreshing while there is a thread waiting
2057+
// for the engine write lock.
2058+
final var maybeRefreshLock = engineConfig.getMaybeRefreshLock();
2059+
maybeRefreshLock.lock();
2060+
try {
2061+
// it is intentional that we never refresh both internal / external together
2062+
if (block) {
2063+
referenceManager.maybeRefreshBlocking();
2064+
refreshed = true;
2065+
} else {
2066+
refreshed = referenceManager.maybeRefresh();
2067+
}
2068+
} finally {
2069+
maybeRefreshLock.unlock();
20602070
}
20612071
if (refreshed) {
20622072
final ElasticsearchDirectoryReader current = referenceManager.acquire();

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
315315
// the translog keeps track of the GCP, but unpromotable shards have no translog so we need to track the GCP here instead
316316
private volatile long globalCheckPointIfUnpromotable;
317317

318+
/**
319+
* Indicates that the {@link #close(String, boolean, Executor, ActionListener)} has been called
320+
*/
321+
private final AtomicBoolean isClosing = new AtomicBoolean();
322+
318323
@SuppressWarnings("this-escape")
319324
public IndexShard(
320325
final ShardRouting shardRouting,
@@ -1819,8 +1824,13 @@ public CacheHelper getReaderCacheHelper() {
18191824

18201825
}
18211826

1827+
public boolean isClosing() {
1828+
return isClosing.get();
1829+
}
1830+
18221831
public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException {
18231832
synchronized (closeMutex) {
1833+
isClosing.set(true);
18241834
Engine engineOrNull = null;
18251835
try {
18261836
// engine reference and shard state are changed under the engine write lock
@@ -1852,8 +1862,8 @@ public void close(String reason, boolean flushEngine, Executor closeExecutor, Ac
18521862
@Override
18531863
public void run() throws Exception {
18541864
try {
1865+
assert engineLock.isWriteLockedByCurrentThread() == false : "do not close engine while holding write lock";
18551866
if (engine != null && flushEngine) {
1856-
assert engineLock.isWriteLockedByCurrentThread() == false : "do not flush under engine write lock";
18571867
engine.flushAndClose();
18581868
}
18591869
} finally {
@@ -3735,7 +3745,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
37353745
relativeTimeInNanosSupplier,
37363746
indexCommitListener,
37373747
routingEntry().isPromotableToPrimary(),
3738-
mapperService()
3748+
mapperService(),
3749+
engineLock.readLock()
37393750
);
37403751
}
37413752

@@ -4470,41 +4481,26 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
44704481
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
44714482
assert waitForEngineOrClosedShardListeners.isDone();
44724483
try {
4473-
engineLock.readLock().lock();
4474-
var release = true;
4484+
Engine previousEngine = null;
4485+
engineLock.writeLock().lock();
44754486
try {
44764487
verifyNotClosed();
4477-
// Preparing the engine for reset flushes and blocks for refresh: it should not be executed under the write lock because
4478-
// another thread might already be refreshing, and the refresh listeners in that thread will need to access to the engine
4479-
// using the read lock. If we were using the write lock here, it would deadlock.
44804488
currentEngine.prepareForEngineReset();
4481-
engineLock.readLock().unlock();
4482-
release = false;
4483-
4484-
// Promote to write lock in order to swap engines
4485-
engineLock.writeLock().lock();
4486-
Engine previousEngine = null;
4489+
var newEngine = createEngine(newEngineConfig(replicationTracker));
4490+
previousEngine = getAndSetCurrentEngine(newEngine);
4491+
postResetNewEngineConsumer.accept(newEngine);
4492+
onNewEngine(newEngine);
4493+
} finally {
4494+
// Downgrade to read lock for closing the engine
4495+
engineLock.readLock().lock();
44874496
try {
4488-
4489-
// How do we ensure that no indexing operations have been processed since prepareForEngineReset() here? We're not
4490-
// blocking all operations when resetting the engine nor we are blocking flushes or force-merges.
4491-
4492-
assert state != IndexShardState.CLOSED : "state has changed without holding engine write lock!";
4493-
var newEngine = createEngine(newEngineConfig(replicationTracker));
4494-
previousEngine = getAndSetCurrentEngine(newEngine);
4495-
postResetNewEngineConsumer.accept(newEngine);
4496-
onNewEngine(newEngine);
4497+
engineLock.writeLock().unlock();
4498+
// Some engine implementations use a references counting mechanism to avoid closing the engine until all operations
4499+
// requiring the engine to be open to run are completed (and in order to ensure this open state, the operations
4500+
// acquire a reference before running). In case an operation requires to access the engine read lock during
4501+
// execution, it is important that we don't hold the engine write lock here otherwise it might deadlock.
4502+
IOUtils.close(previousEngine);
44974503
} finally {
4498-
engineLock.readLock().lock();
4499-
try {
4500-
engineLock.writeLock().unlock();
4501-
IOUtils.close(previousEngine);
4502-
} finally {
4503-
engineLock.readLock().unlock();
4504-
}
4505-
}
4506-
} finally {
4507-
if (release) {
45084504
engineLock.readLock().unlock();
45094505
}
45104506
}
@@ -4515,6 +4511,11 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
45154511
}
45164512
}
45174513

4514+
// Some engine implementations use a references counting mechanism to avoid closing the engine until all operations
4515+
// requiring the engine to be open to run are completed (and in order to ensure this open state, the operations
4516+
// acquire a reference before running). In case an operation requires to access the engine read lock during
4517+
// execution, it is important that we don't hold the engine write lock here otherwise it might deadlock.
4518+
45184519
/**
45194520
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
45204521
*/

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3612,7 +3612,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
36123612
config.getRelativeTimeInNanosSupplier(),
36133613
null,
36143614
true,
3615-
config.getMapperService()
3615+
config.getMapperService(),
3616+
config.getMaybeRefreshLock()
36163617
);
36173618
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
36183619

@@ -7175,7 +7176,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
71757176
config.getRelativeTimeInNanosSupplier(),
71767177
config.getIndexCommitListener(),
71777178
config.isPromotableToPrimary(),
7178-
config.getMapperService()
7179+
config.getMapperService(),
7180+
config.getMaybeRefreshLock()
71797181
);
71807182
try (InternalEngine engine = createEngine(configWithWarmer)) {
71817183
assertThat(warmedUpReaders, empty());

0 commit comments

Comments
 (0)