diff --git a/server/src/main/java/org/elasticsearch/index/shard/EngineRef.java b/server/src/main/java/org/elasticsearch/index/shard/EngineRef.java new file mode 100644 index 0000000000000..8d3637a6ea012 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/EngineRef.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.shard; + +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.index.engine.Engine; + +public final class EngineRef extends AbstractRefCounted implements Releasable { + + private final Releasable releasable; + private final Engine engine; + + public EngineRef(Engine engine, Releasable releasable) { + this.engine = engine; + this.releasable = releasable; + } + + @Nullable + public Engine getEngineOrNull() { + return engine; + } + + public Engine getEngine() { + var engine = getEngineOrNull(); + if (engine == null) { + throw new AlreadyClosedException("engine is closed"); + } + return engine; + } + + @Override + protected void closeInternal() { + releasable.close(); + } + + @Override + public void close() { + decRef(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/EngineReferenceManager.java b/server/src/main/java/org/elasticsearch/index/shard/EngineReferenceManager.java new file mode 100644 index 0000000000000..5ecaff49c3694 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/EngineReferenceManager.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.engine.Engine; + +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +class EngineReferenceManager { + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // fair + private final Releasable releaseExclusiveLock = () -> lock.writeLock().unlock(); // reuse this to avoid allocation for each op + private final Releasable releaseLock = () -> lock.readLock().unlock(); // reuse this to avoid allocation for each op + + private volatile Engine current; + + /** + * @return a releasable reference to a given {@link Engine}, preventing it to be changed until the reference is released. Note that the + * {@link Engine} referenced by the {@link EngineRef} may be null and may be closed at anytime in case of an engine failure, but is + * guaranteed to not be changed (and closed) by an engine reset. + */ + public EngineRef getEngineRef() { + lock.readLock().lock(); + return new EngineRef(this.current, Releasables.assertOnce(releaseLock)); + } + + /** + * Acquires a lock that prevents the {@link Engine} to be changed until the returned releasable is released. + * @return + */ + Releasable acquireEngineLock() { + lock.writeLock().lock(); + return Releasables.assertOnce(releaseExclusiveLock); + } + + boolean isEngineLockHeldByCurrentThread() { + return lock.writeLock().isHeldByCurrentThread(); + } + + /** + * @return the (possibly null) current reference to the {@link Engine} + */ + @Nullable + Engine get() { + return this.current; + } + + @Nullable + Engine getEngineAndSet(Supplier supplier) { + assert supplier != null : "supplier cannot be null"; + assert isEngineLockHeldByCurrentThread(); + Engine previous = this.current; + this.current = supplier.get(); + return previous; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d56d7471d498e..2c9c17d3193db 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -240,8 +240,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // ensure happens-before relation between addRefreshListener() and postRecovery() private volatile SubscribableListener postRecoveryComplete; private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex - private final AtomicReference currentEngineReference = new AtomicReference<>(); + + private final EngineReferenceManager engineReferenceManager = new EngineReferenceManager(); final EngineFactory engineFactory; private final IndexingOperationListener indexingOperationListeners; @@ -1277,10 +1277,13 @@ private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly, Function if (indexSettings.getIndexVersionCreated().isLegacyIndexVersion()) { throw new IllegalStateException("get operations not allowed on a legacy index"); } - if (translogOnly) { - return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper); + try (var engineRef = engineReferenceManager.getEngineRef()) { + var engine = engineRef.getEngine(); + if (translogOnly) { + return engine.getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper); + } + return engine.get(get, mappingLookup, mapperService.documentParser(), searcherWrapper); } - return getEngine().get(get, mappingLookup, mapperService.documentParser(), searcherWrapper); } /** @@ -1613,10 +1616,10 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { Engine.IndexCommitRef indexCommit = null; store.incRef(); try { - synchronized (engineMutex) { + try (var engineRef = engineReferenceManager.getEngineRef()) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. - final Engine engine = getEngineOrNull(); + final Engine engine = engineRef.getEngineOrNull(); if (engine != null) { indexCommit = engine.acquireLastIndexCommit(false); } @@ -1776,14 +1779,14 @@ public CacheHelper getReaderCacheHelper() { } public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener closeListener) throws IOException { - synchronized (engineMutex) { + try (var ignored = engineReferenceManager.acquireEngineLock()) { try { synchronized (mutex) { changeState(IndexShardState.CLOSED, reason); } checkAndCallWaitForEngineOrClosedShardListeners(); } finally { - final Engine engine = this.currentEngineReference.getAndSet(null); + final Engine engine = engineReferenceManager.getEngineAndSet(() -> null); closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() { @Override public void run() throws Exception { @@ -1857,7 +1860,7 @@ public void prepareForIndexRecovery() { throw new IndexShardNotRecoveringException(shardId, state); } recoveryState.setStage(RecoveryState.Stage.INDEX); - assert currentEngineReference.get() == null; + assert assertEngineReferenceIsNull("prepare for recovery, engine should be null"); } /** @@ -1936,8 +1939,8 @@ private void doLocalRecovery( // First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again. .newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; - synchronized (engineMutex) { - IOUtils.close(currentEngineReference.getAndSet(null)); + try (var ignored = engineReferenceManager.acquireEngineLock()) { + IOUtils.close(engineReferenceManager.getEngineAndSet(() -> null)); } }, (recoveryCompleteListener, ignoredRef) -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; @@ -2167,13 +2170,14 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); - synchronized (engineMutex) { - assert currentEngineReference.get() == null : "engine is running"; + try (var ignored = engineReferenceManager.acquireEngineLock()) { + assert assertEngineReferenceIsNull("engine is running"); verifyNotClosed(); - // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). + // we must create a new engine under lock (see IndexShard#snapshotStoreMetadata). final Engine newEngine = createEngine(config); onNewEngine(newEngine); - currentEngineReference.set(newEngine); + var previous = engineReferenceManager.getEngineAndSet(() -> newEngine); + assert previous == null; // We set active because we are now writing operations to the engine; this way, // we can flush if we go idle after some time and become inactive. active.set(true); @@ -2241,7 +2245,7 @@ private boolean assertLastestCommitUserData() throws IOException { } private void onNewEngine(Engine newEngine) { - assert Thread.holdsLock(engineMutex); + assert engineReferenceManager.isEngineLockHeldByCurrentThread(); refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint); refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo); @@ -2252,9 +2256,9 @@ private void onNewEngine(Engine newEngine) { */ public void performRecoveryRestart() throws IOException { assert Thread.holdsLock(mutex) == false : "restart recovery under mutex"; - synchronized (engineMutex) { + try (var ignored = engineReferenceManager.acquireEngineLock()) { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - IOUtils.close(currentEngineReference.getAndSet(null)); + IOUtils.close(engineReferenceManager.getEngineAndSet(() -> null)); resetRecoveryStage(); } } @@ -2264,7 +2268,7 @@ public void performRecoveryRestart() throws IOException { */ public void resetRecoveryStage() { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; - assert currentEngineReference.get() == null; + assert assertEngineReferenceIsNull("reset recovery stage, engine should be null"); if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -3298,7 +3302,7 @@ Engine getEngine() { * closed. */ public Engine getEngineOrNull() { - return this.currentEngineReference.get(); + return engineReferenceManager.get(); } public void startRecovery( @@ -4312,11 +4316,11 @@ public void resetEngine() { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert waitForEngineOrClosedShardListeners.isDone(); try { - synchronized (engineMutex) { + try (var ignored = engineReferenceManager.acquireEngineLock()) { verifyNotClosed(); getEngine().prepareForEngineReset(); var newEngine = createEngine(newEngineConfig(replicationTracker)); - IOUtils.close(currentEngineReference.getAndSet(newEngine)); + IOUtils.close(engineReferenceManager.getEngineAndSet(() -> newEngine)); onNewEngine(newEngine); } onSettingsChanged(); @@ -4342,7 +4346,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED SetOnce newEngineReference = new SetOnce<>(); final long globalCheckpoint = getLastKnownGlobalCheckpoint(); assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); - synchronized (engineMutex) { + try (var ignored = engineReferenceManager.acquireEngineLock()) { verifyNotClosed(); // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. @@ -4357,7 +4361,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED ) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - synchronized (engineMutex) { + try (var ignored = engineReferenceManager.getEngineRef()) { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); } @@ -4368,7 +4372,7 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { @Override public IndexCommitRef acquireSafeIndexCommit() { - synchronized (engineMutex) { + try (var ignored = engineReferenceManager.getEngineRef()) { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); } @@ -4379,9 +4383,9 @@ public IndexCommitRef acquireSafeIndexCommit() { @Override public void close() throws IOException { Engine newEngine; - synchronized (engineMutex) { + try (var engineRef = engineReferenceManager.getEngineRef()) { newEngine = newEngineReference.get(); - if (newEngine == currentEngineReference.get()) { + if (newEngine == engineRef.getEngineOrNull()) { // we successfully installed the new engine so do not close it. newEngine = null; } @@ -4389,7 +4393,7 @@ public void close() throws IOException { IOUtils.close(super::close, newEngine); } }; - IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + IOUtils.close(engineReferenceManager.getEngineAndSet(() -> readOnlyEngine)); newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); } @@ -4403,9 +4407,9 @@ public void close() throws IOException { ); newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); newEngineReference.get().refresh("reset_engine"); - synchronized (engineMutex) { + try (var ignored = engineReferenceManager.acquireEngineLock()) { verifyNotClosed(); - IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); + IOUtils.close(engineReferenceManager.getEngineAndSet(newEngineReference::get)); // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); @@ -4516,4 +4520,11 @@ public void ensureMutable(ActionListener listener) { l.onResponse(null); })); } + + private boolean assertEngineReferenceIsNull(String message) { + // use accessor with no lock as this asserting method can be called anywhere, + // including under the refresh lock of the index reader. + assert engineReferenceManager.get() == null : message; + return true; + } }