-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Hold engine read lock during reader refresh #125856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 125856 | ||
summary: Hold engine read lock during reader refresh | ||
area: Engine | ||
type: bug | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -315,6 +315,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl | |
// the translog keeps track of the GCP, but unpromotable shards have no translog so we need to track the GCP here instead | ||
private volatile long globalCheckPointIfUnpromotable; | ||
|
||
/** | ||
* Indicates that the {@link #close(String, boolean, Executor, ActionListener)} has been called | ||
*/ | ||
private final AtomicBoolean isClosing = new AtomicBoolean(); | ||
|
||
@SuppressWarnings("this-escape") | ||
public IndexShard( | ||
final ShardRouting shardRouting, | ||
|
@@ -1819,8 +1824,13 @@ public CacheHelper getReaderCacheHelper() { | |
|
||
} | ||
|
||
public boolean isClosing() { | ||
return isClosing.get(); | ||
} | ||
|
||
public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException { | ||
synchronized (closeMutex) { | ||
isClosing.set(true); | ||
Engine engineOrNull = null; | ||
try { | ||
// engine reference and shard state are changed under the engine write lock | ||
|
@@ -1852,8 +1862,8 @@ public void close(String reason, boolean flushEngine, Executor closeExecutor, Ac | |
@Override | ||
public void run() throws Exception { | ||
try { | ||
assert engineLock.isWriteLockedByCurrentThread() == false : "do not close engine while holding write lock"; | ||
if (engine != null && flushEngine) { | ||
assert engineLock.isWriteLockedByCurrentThread() == false : "do not flush under engine write lock"; | ||
engine.flushAndClose(); | ||
} | ||
} finally { | ||
|
@@ -3735,7 +3745,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { | |
relativeTimeInNanosSupplier, | ||
indexCommitListener, | ||
routingEntry().isPromotableToPrimary(), | ||
mapperService() | ||
mapperService(), | ||
engineLock.readLock() | ||
); | ||
} | ||
|
||
|
@@ -4470,41 +4481,26 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) { | |
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; | ||
assert waitForEngineOrClosedShardListeners.isDone(); | ||
try { | ||
engineLock.readLock().lock(); | ||
var release = true; | ||
Engine previousEngine = null; | ||
engineLock.writeLock().lock(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Really nice simplification out of this work 🎉 |
||
try { | ||
verifyNotClosed(); | ||
// Preparing the engine for reset flushes and blocks for refresh: it should not be executed under the write lock because | ||
// another thread might already be refreshing, and the refresh listeners in that thread will need to access to the engine | ||
// using the read lock. If we were using the write lock here, it would deadlock. | ||
currentEngine.prepareForEngineReset(); | ||
engineLock.readLock().unlock(); | ||
release = false; | ||
|
||
// Promote to write lock in order to swap engines | ||
engineLock.writeLock().lock(); | ||
Engine previousEngine = null; | ||
var newEngine = createEngine(newEngineConfig(replicationTracker)); | ||
previousEngine = getAndSetCurrentEngine(newEngine); | ||
postResetNewEngineConsumer.accept(newEngine); | ||
onNewEngine(newEngine); | ||
} finally { | ||
// Downgrade to read lock for closing the engine | ||
engineLock.readLock().lock(); | ||
try { | ||
|
||
// How do we ensure that no indexing operations have been processed since prepareForEngineReset() here? We're not | ||
// blocking all operations when resetting the engine nor we are blocking flushes or force-merges. | ||
|
||
assert state != IndexShardState.CLOSED : "state has changed without holding engine write lock!"; | ||
var newEngine = createEngine(newEngineConfig(replicationTracker)); | ||
previousEngine = getAndSetCurrentEngine(newEngine); | ||
postResetNewEngineConsumer.accept(newEngine); | ||
onNewEngine(newEngine); | ||
engineLock.writeLock().unlock(); | ||
// Some engine implementations use a references counting mechanism to avoid closing the engine until all operations | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for adding the comment 👍 |
||
// requiring the engine to be open to run are completed (and in order to ensure this open state, the operations | ||
// acquire a reference before running). In case an operation requires to access the engine read lock during | ||
// execution, it is important that we don't hold the engine write lock here otherwise it might deadlock. | ||
IOUtils.close(previousEngine); | ||
} finally { | ||
engineLock.readLock().lock(); | ||
try { | ||
engineLock.writeLock().unlock(); | ||
IOUtils.close(previousEngine); | ||
} finally { | ||
engineLock.readLock().unlock(); | ||
} | ||
} | ||
} finally { | ||
if (release) { | ||
engineLock.readLock().unlock(); | ||
} | ||
} | ||
|
@@ -4515,6 +4511,11 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) { | |
} | ||
} | ||
|
||
// Some engine implementations use a references counting mechanism to avoid closing the engine until all operations | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this is duplicated? |
||
// requiring the engine to be open to run are completed (and in order to ensure this open state, the operations | ||
// acquire a reference before running). In case an operation requires to access the engine read lock during | ||
// execution, it is important that we don't hold the engine write lock here otherwise it might deadlock. | ||
|
||
/** | ||
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why we needed to introduce this new flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only to help with some Engine.Warmer logic that is executed by the refresh listener and cannot abort early if the shard is closing.