Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125856.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125856
summary: Hold engine read lock during reader refresh
area: Engine
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
config.getRelativeTimeInNanosSupplier(),
config.getIndexCommitListener(),
config.isPromotableToPrimary(),
config.getMapperService()
config.getMapperService(),
config.getMaybeRefreshLock()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public final class IndexWarmer {
}

void warm(ElasticsearchDirectoryReader reader, IndexShard shard, IndexSettings settings) {
if (shard.state() == IndexShardState.CLOSED) {
if (shard.state() == IndexShardState.CLOSED || shard.isClosing()) {
return;
}
if (settings.isWarmerEnabled() == false) {
Expand Down Expand Up @@ -117,6 +117,9 @@ public TerminationHandle warmReader(final IndexShard indexShard, final Elasticse
for (final MappedFieldType fieldType : warmUpGlobalOrdinals.values()) {
executor.execute(() -> {
try {
if (indexShard.isClosing()) {
return;
}
final long start = System.nanoTime();
IndexFieldData.Global<?> ifd = indexFieldDataService.getForField(
fieldType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public IndexWarmer.TerminationHandle warmReader(final IndexShard indexShard, fin
return TerminationHandle.NO_WAIT;
}

if (loadRandomAccessFiltersEagerly == false) {
if (loadRandomAccessFiltersEagerly == false || indexShard.isClosing()) {
return TerminationHandle.NO_WAIT;
}

Expand All @@ -291,7 +291,9 @@ public IndexWarmer.TerminationHandle warmReader(final IndexShard indexShard, fin
executor.execute(() -> {
try {
final long start = System.nanoTime();
getAndLoadIfNotPresent(filterToWarm, ctx);
if (indexShard.isClosing() == false) {
getAndLoadIfNotPresent(filterToWarm, ctx);
}
if (indexShard.warmerService().logger().isTraceEnabled()) {
indexShard.warmerService()
.logger()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -146,6 +147,11 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {

private final boolean promotableToPrimary;

/**
* Lock to acquire before executing refreshing a reader instance.
*/
private final Lock maybeRefreshLock;

/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
Expand Down Expand Up @@ -177,7 +183,8 @@ public EngineConfig(
LongSupplier relativeTimeInNanosSupplier,
Engine.IndexCommitListener indexCommitListener,
boolean promotableToPrimary,
MapperService mapperService
MapperService mapperService,
Lock maybeRefreshLock
) {
this.shardId = shardId;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -224,6 +231,7 @@ public EngineConfig(
this.promotableToPrimary = promotableToPrimary;
// always use compound on flush - reduces # of file-handles on refresh
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
this.maybeRefreshLock = maybeRefreshLock;
}

/**
Expand Down Expand Up @@ -468,4 +476,8 @@ public boolean getUseCompoundFile() {
public MapperService getMapperService() {
return mapperService;
}

public Lock getMaybeRefreshLock() {
return maybeRefreshLock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2051,12 +2051,22 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea
// the second refresh will only do the extra work we have to do for warming caches etc.
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();
// it is intentional that we never refresh both internal / external together
if (block) {
referenceManager.maybeRefreshBlocking();
refreshed = true;
} else {
refreshed = referenceManager.maybeRefresh();

// acquire the engine read lock before trying to acquire the reader reference manager's refresh lock, so that the
// refresh is guaranteed to complete if it tries accessing the engine for refreshing while there is a thread waiting
// for the engine write lock.
final var maybeRefreshLock = engineConfig.getMaybeRefreshLock();
maybeRefreshLock.lock();
try {
// it is intentional that we never refresh both internal / external together
if (block) {
referenceManager.maybeRefreshBlocking();
refreshed = true;
} else {
refreshed = referenceManager.maybeRefresh();
}
} finally {
maybeRefreshLock.unlock();
}
if (refreshed) {
final ElasticsearchDirectoryReader current = referenceManager.acquire();
Expand Down
65 changes: 33 additions & 32 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// the translog keeps track of the GCP, but unpromotable shards have no translog so we need to track the GCP here instead
private volatile long globalCheckPointIfUnpromotable;

/**
* Indicates that the {@link #close(String, boolean, Executor, ActionListener)} has been called
*/
private final AtomicBoolean isClosing = new AtomicBoolean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why we needed to introduce this new flag?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only to help with some Engine.Warmer logic that is executed by the refresh listener and cannot abort early if the shard is closing.


@SuppressWarnings("this-escape")
public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -1819,8 +1824,13 @@ public CacheHelper getReaderCacheHelper() {

}

public boolean isClosing() {
return isClosing.get();
}

public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException {
synchronized (closeMutex) {
isClosing.set(true);
Engine engineOrNull = null;
try {
// engine reference and shard state are changed under the engine write lock
Expand Down Expand Up @@ -1852,8 +1862,8 @@ public void close(String reason, boolean flushEngine, Executor closeExecutor, Ac
@Override
public void run() throws Exception {
try {
assert engineLock.isWriteLockedByCurrentThread() == false : "do not close engine while holding write lock";
if (engine != null && flushEngine) {
assert engineLock.isWriteLockedByCurrentThread() == false : "do not flush under engine write lock";
engine.flushAndClose();
}
} finally {
Expand Down Expand Up @@ -3735,7 +3745,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
relativeTimeInNanosSupplier,
indexCommitListener,
routingEntry().isPromotableToPrimary(),
mapperService()
mapperService(),
engineLock.readLock()
);
}

Expand Down Expand Up @@ -4470,41 +4481,26 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
assert waitForEngineOrClosedShardListeners.isDone();
try {
engineLock.readLock().lock();
var release = true;
Engine previousEngine = null;
engineLock.writeLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice simplification out of this work 🎉

try {
verifyNotClosed();
// Preparing the engine for reset flushes and blocks for refresh: it should not be executed under the write lock because
// another thread might already be refreshing, and the refresh listeners in that thread will need to access to the engine
// using the read lock. If we were using the write lock here, it would deadlock.
currentEngine.prepareForEngineReset();
engineLock.readLock().unlock();
release = false;

// Promote to write lock in order to swap engines
engineLock.writeLock().lock();
Engine previousEngine = null;
var newEngine = createEngine(newEngineConfig(replicationTracker));
previousEngine = getAndSetCurrentEngine(newEngine);
postResetNewEngineConsumer.accept(newEngine);
onNewEngine(newEngine);
} finally {
// Downgrade to read lock for closing the engine
engineLock.readLock().lock();
try {

// How do we ensure that no indexing operations have been processed since prepareForEngineReset() here? We're not
// blocking all operations when resetting the engine nor we are blocking flushes or force-merges.

assert state != IndexShardState.CLOSED : "state has changed without holding engine write lock!";
var newEngine = createEngine(newEngineConfig(replicationTracker));
previousEngine = getAndSetCurrentEngine(newEngine);
postResetNewEngineConsumer.accept(newEngine);
onNewEngine(newEngine);
engineLock.writeLock().unlock();
// Some engine implementations use a references counting mechanism to avoid closing the engine until all operations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the comment 👍

// requiring the engine to be open to run are completed (and in order to ensure this open state, the operations
// acquire a reference before running). In case an operation requires to access the engine read lock during
// execution, it is important that we don't hold the engine write lock here otherwise it might deadlock.
IOUtils.close(previousEngine);
} finally {
engineLock.readLock().lock();
try {
engineLock.writeLock().unlock();
IOUtils.close(previousEngine);
} finally {
engineLock.readLock().unlock();
}
}
} finally {
if (release) {
engineLock.readLock().unlock();
}
}
Expand All @@ -4515,6 +4511,11 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
}
}

// Some engine implementations use a references counting mechanism to avoid closing the engine until all operations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is duplicated?

// requiring the engine to be open to run are completed (and in order to ensure this open state, the operations
// acquire a reference before running). In case an operation requires to access the engine read lock during
// execution, it is important that we don't hold the engine write lock here otherwise it might deadlock.

/**
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3612,7 +3612,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
config.getRelativeTimeInNanosSupplier(),
null,
true,
config.getMapperService()
config.getMapperService(),
config.getMaybeRefreshLock()
);
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));

Expand Down Expand Up @@ -7175,7 +7176,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
config.getRelativeTimeInNanosSupplier(),
config.getIndexCommitListener(),
config.isPromotableToPrimary(),
config.getMapperService()
config.getMapperService(),
config.getMaybeRefreshLock()
);
try (InternalEngine engine = createEngine(configWithWarmer)) {
assertThat(warmedUpReaders, empty());
Expand Down
Loading
Loading