Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,10 @@ protected final void ensureOpen() {
ensureOpen(null);
}

public boolean isOperable() {
return true;
}

/** get commits stats for the last commit */
public final CommitStats commitStats() {
return new CommitStats(getLastCommittedSegmentInfos());
Expand Down
179 changes: 134 additions & 45 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -240,8 +241,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// ensure happens-before relation between addRefreshListener() and postRecovery()
private volatile SubscribableListener<Void> postRecoveryComplete;
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();

private final ReentrantReadWriteLock engineLock = new ReentrantReadWriteLock(); // lock ordering: engineLock.writeLock -> mutex
private Engine currentEngine = null;
final EngineFactory engineFactory;

private final IndexingOperationListener indexingOperationListeners;
Expand Down Expand Up @@ -1613,18 +1615,14 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
Engine.IndexCommitRef indexCommit = null;
store.incRef();
try {
synchronized (engineMutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
final Engine engine = getEngineOrNull();
if (engine != null) {
indexCommit = engine.acquireLastIndexCommit(false);
}
if (indexCommit == null) {
return store.getMetadata(null, true);
}
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
indexCommit = withEngineOrNull(engine -> engine != null ? engine.acquireLastIndexCommit(false) : null);
if (indexCommit != null) {
return store.getMetadata(indexCommit.getIndexCommit());
} else {
return store.getMetadata(null, true);
}
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
IOUtils.close(indexCommit);
Expand Down Expand Up @@ -1776,14 +1774,15 @@ public CacheHelper getReaderCacheHelper() {
}

public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException {
synchronized (engineMutex) {
engineLock.writeLock().lock();
try {
synchronized (mutex) {
changeState(IndexShardState.CLOSED, reason);
}
checkAndCallWaitForEngineOrClosedShardListeners();
} finally {
try {
synchronized (mutex) {
changeState(IndexShardState.CLOSED, reason);
}
checkAndCallWaitForEngineOrClosedShardListeners();
} finally {
final Engine engine = this.currentEngineReference.getAndSet(null);
final Engine engine = getAndSetCurrentEngine(null);
closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() {
@Override
public void run() throws Exception {
Expand All @@ -1809,6 +1808,8 @@ public String toString() {
return "IndexShard#close[" + shardId + "]";
}
}));
} finally {
engineLock.writeLock().unlock();
}
}
}
Expand Down Expand Up @@ -1857,7 +1858,7 @@ public void prepareForIndexRecovery() {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState.setStage(RecoveryState.Stage.INDEX);
assert currentEngineReference.get() == null;
assert this.currentEngine == null;
}

/**
Expand Down Expand Up @@ -1936,8 +1937,11 @@ private void doLocalRecovery(
// First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again.
.<Void>newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> {
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
synchronized (engineMutex) {
IOUtils.close(currentEngineReference.getAndSet(null));
engineLock.writeLock().lock();
try {
IOUtils.close(getAndSetCurrentEngine(null));
} finally {
engineLock.writeLock().unlock();
}
}, (recoveryCompleteListener, ignoredRef) -> {
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
Expand Down Expand Up @@ -2167,16 +2171,19 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
+ recoveryState.getRecoverySource()
+ "] but got "
+ getRetentionLeases();
synchronized (engineMutex) {
assert currentEngineReference.get() == null : "engine is running";
engineLock.writeLock().lock();
try {
assert this.currentEngine == null : "engine is running";
verifyNotClosed();
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = createEngine(config);
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
getAndSetCurrentEngine(newEngine);
// We set active because we are now writing operations to the engine; this way,
// we can flush if we go idle after some time and become inactive.
active.set(true);
} finally {
engineLock.writeLock().unlock();
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
Expand Down Expand Up @@ -2241,7 +2248,7 @@ private boolean assertLastestCommitUserData() throws IOException {
}

private void onNewEngine(Engine newEngine) {
assert Thread.holdsLock(engineMutex);
assert engineLock.isWriteLockedByCurrentThread();
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint);
refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo);
Expand All @@ -2252,10 +2259,13 @@ private void onNewEngine(Engine newEngine) {
*/
public void performRecoveryRestart() throws IOException {
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
synchronized (engineMutex) {
engineLock.writeLock().lock();
try {
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
IOUtils.close(currentEngineReference.getAndSet(null));
IOUtils.close(getAndSetCurrentEngine(null));
resetRecoveryStage();
} finally {
engineLock.writeLock().unlock();
}
}

Expand All @@ -2264,7 +2274,7 @@ public void performRecoveryRestart() throws IOException {
*/
public void resetRecoveryStage() {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
assert currentEngineReference.get() == null;
assert this.currentEngine == null;
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
Expand Down Expand Up @@ -3286,19 +3296,80 @@ private void doCheckIndex() throws IOException {
}

Engine getEngine() {
Engine engine = getEngineOrNull();
if (engine == null) {
throw new AlreadyClosedException("engine is closed");
engineLock.readLock().lock();
try {
return getCurrentEngine(false);
} finally {
engineLock.readLock().unlock();
}
return engine;
}

/**
* NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
* closed.
*/
public Engine getEngineOrNull() {
return this.currentEngineReference.get();
engineLock.readLock().lock();
try {
return getCurrentEngine(true);
} finally {
engineLock.readLock().unlock();
}
}

public <R> R withEngine(Function<Engine, R> operation) {
return withEngine(operation, false);
}

public <R> R withEngineOrNull(Function<Engine, R> operation) {
return withEngine(operation, true);
}

private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine) {
assert operation != null;
engineLock.readLock().lock();
var release = true;
try {
var engine = getCurrentEngine(allowNoEngine);
if (engine != null && engine.isOperable() == false) {
engineLock.readLock().unlock();
release = false;
engineLock.writeLock().lock();
try {
engine = getCurrentEngine(allowNoEngine);
if (engine != null && engine.isOperable() == false) {
resetEngine();
engine = getCurrentEngine(allowNoEngine);
}
engineLock.readLock().lock();
release = true;
} finally {
engineLock.writeLock().unlock();
}
}
assert engine == null || engine.isOperable();
return operation.apply(engine);
} finally {
if (release) {
engineLock.readLock().unlock();
}
}
}

private Engine getCurrentEngine(boolean allowNoEngine) {
assert engineLock.getReadHoldCount() > 0;
var engine = this.currentEngine;
if (engine == null && allowNoEngine == false) {
throw new AlreadyClosedException("engine is closed");
}
return engine;
}

private Engine getAndSetCurrentEngine(Engine newEngine) {
assert engineLock.isWriteLockedByCurrentThread();
var previousEngine = this.currentEngine;
this.currentEngine = newEngine;
return previousEngine;
}

public void startRecovery(
Expand Down Expand Up @@ -4312,12 +4383,15 @@ public void resetEngine() {
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
assert waitForEngineOrClosedShardListeners.isDone();
try {
synchronized (engineMutex) {
engineLock.writeLock().lock(); // might already be held
try {
verifyNotClosed();
getEngine().prepareForEngineReset();
var newEngine = createEngine(newEngineConfig(replicationTracker));
IOUtils.close(currentEngineReference.getAndSet(newEngine));
IOUtils.close(getAndSetCurrentEngine(newEngine));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only do this if not operable, since it could be swapped by a reading thread already?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could already have been swapped indeed. But since resetEngine is used to transition from hollow to unhollow and vice versa, I had to add a operability parameter to indicate in which state we expect the new engine to be in.

See 1ddac5e

onNewEngine(newEngine);
} finally {
engineLock.writeLock().unlock();
}
onSettingsChanged();
} catch (Exception e) {
Expand All @@ -4342,7 +4416,8 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
SetOnce<Engine> newEngineReference = new SetOnce<>();
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
synchronized (engineMutex) {
engineLock.writeLock().lock();
try {
verifyNotClosed();
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
Expand All @@ -4357,41 +4432,52 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
) {
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
synchronized (engineMutex) {
engineLock.readLock().lock();
try {
if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed");
}
// ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay
return newEngineReference.get().acquireLastIndexCommit(false);
} finally {
engineLock.readLock().unlock();
}
}

@Override
public IndexCommitRef acquireSafeIndexCommit() {
synchronized (engineMutex) {
engineLock.readLock().lock();
try {
if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed");
}
return newEngineReference.get().acquireSafeIndexCommit();
} finally {
engineLock.readLock().unlock();
}
}

@Override
public void close() throws IOException {
Engine newEngine;
synchronized (engineMutex) {
engineLock.readLock().lock();
try {
newEngine = newEngineReference.get();
if (newEngine == currentEngineReference.get()) {
if (newEngine == getCurrentEngine(true)) {
// we successfully installed the new engine so do not close it.
newEngine = null;
}
} finally {
engineLock.readLock().unlock();
}
IOUtils.close(super::close, newEngine);
}
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
IOUtils.close(getAndSetCurrentEngine(readOnlyEngine));
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
} finally {
engineLock.writeLock().unlock();
}
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
engine,
Expand All @@ -4403,12 +4489,15 @@ public void close() throws IOException {
);
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
newEngineReference.get().refresh("reset_engine");
synchronized (engineMutex) {
engineLock.writeLock().lock();
try {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
IOUtils.close(getAndSetCurrentEngine(newEngineReference.get()));
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
} finally {
engineLock.writeLock().unlock();
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
Expand Down