From b852e3715f8ae0a55ea16d5ff0e7e19c001b77ab Mon Sep 17 00:00:00 2001 From: tlrx Date: Fri, 28 Feb 2025 10:48:05 +0100 Subject: [PATCH 01/14] [Test] Add IndexShard.withEngine method Alternative of #122749, this draft adds an `IndexShard.withEngine` method based on a reentrant lock that can be used to execute an operation while preventing any engine change during execution. --- .../elasticsearch/index/engine/Engine.java | 4 + .../elasticsearch/index/shard/IndexShard.java | 179 +++++++++++++----- 2 files changed, 138 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 3298d8757ca92..451cf2b3d2bad 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -989,6 +989,10 @@ protected final void ensureOpen() { ensureOpen(null); } + public boolean isOperable() { + return true; + } + /** get commits stats for the last commit */ public final CommitStats commitStats() { return new CommitStats(getLastCommittedSegmentInfos()); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7fecc53826ff1..ffe7a7967600a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -178,6 +178,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -240,8 +241,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // ensure happens-before relation between addRefreshListener() and postRecovery() private volatile SubscribableListener postRecoveryComplete; private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex - private final AtomicReference currentEngineReference = new AtomicReference<>(); + + private final ReentrantReadWriteLock engineLock = new ReentrantReadWriteLock(); // lock ordering: engineLock.writeLock -> mutex + private Engine currentEngine = null; final EngineFactory engineFactory; private final IndexingOperationListener indexingOperationListeners; @@ -1613,18 +1615,14 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { Engine.IndexCommitRef indexCommit = null; store.incRef(); try { - synchronized (engineMutex) { - // if the engine is not running, we can access the store directly, but we need to make sure no one starts - // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. - final Engine engine = getEngineOrNull(); - if (engine != null) { - indexCommit = engine.acquireLastIndexCommit(false); - } - if (indexCommit == null) { - return store.getMetadata(null, true); - } + // if the engine is not running, we can access the store directly, but we need to make sure no one starts + // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. + indexCommit = withEngineOrNull(engine -> engine != null ? engine.acquireLastIndexCommit(false) : null); + if (indexCommit != null) { + return store.getMetadata(indexCommit.getIndexCommit()); + } else { + return store.getMetadata(null, true); } - return store.getMetadata(indexCommit.getIndexCommit()); } finally { store.decRef(); IOUtils.close(indexCommit); @@ -1776,14 +1774,15 @@ public CacheHelper getReaderCacheHelper() { } public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener closeListener) throws IOException { - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { + synchronized (mutex) { + changeState(IndexShardState.CLOSED, reason); + } + checkAndCallWaitForEngineOrClosedShardListeners(); + } finally { try { - synchronized (mutex) { - changeState(IndexShardState.CLOSED, reason); - } - checkAndCallWaitForEngineOrClosedShardListeners(); - } finally { - final Engine engine = this.currentEngineReference.getAndSet(null); + final Engine engine = getAndSetCurrentEngine(null); closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() { @Override public void run() throws Exception { @@ -1809,6 +1808,8 @@ public String toString() { return "IndexShard#close[" + shardId + "]"; } })); + } finally { + engineLock.writeLock().unlock(); } } } @@ -1857,7 +1858,7 @@ public void prepareForIndexRecovery() { throw new IndexShardNotRecoveringException(shardId, state); } recoveryState.setStage(RecoveryState.Stage.INDEX); - assert currentEngineReference.get() == null; + assert this.currentEngine == null; } /** @@ -1936,8 +1937,11 @@ private void doLocalRecovery( // First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again. .newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; - synchronized (engineMutex) { - IOUtils.close(currentEngineReference.getAndSet(null)); + engineLock.writeLock().lock(); + try { + IOUtils.close(getAndSetCurrentEngine(null)); + } finally { + engineLock.writeLock().unlock(); } }, (recoveryCompleteListener, ignoredRef) -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; @@ -2167,16 +2171,19 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); - synchronized (engineMutex) { - assert currentEngineReference.get() == null : "engine is running"; + engineLock.writeLock().lock(); + try { + assert this.currentEngine == null : "engine is running"; verifyNotClosed(); // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = createEngine(config); onNewEngine(newEngine); - currentEngineReference.set(newEngine); + getAndSetCurrentEngine(newEngine); // We set active because we are now writing operations to the engine; this way, // we can flush if we go idle after some time and become inactive. active.set(true); + } finally { + engineLock.writeLock().unlock(); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. @@ -2241,7 +2248,7 @@ private boolean assertLastestCommitUserData() throws IOException { } private void onNewEngine(Engine newEngine) { - assert Thread.holdsLock(engineMutex); + assert engineLock.isWriteLockedByCurrentThread(); refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint); refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo); @@ -2252,10 +2259,13 @@ private void onNewEngine(Engine newEngine) { */ public void performRecoveryRestart() throws IOException { assert Thread.holdsLock(mutex) == false : "restart recovery under mutex"; - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - IOUtils.close(currentEngineReference.getAndSet(null)); + IOUtils.close(getAndSetCurrentEngine(null)); resetRecoveryStage(); + } finally { + engineLock.writeLock().unlock(); } } @@ -2264,7 +2274,7 @@ public void performRecoveryRestart() throws IOException { */ public void resetRecoveryStage() { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; - assert currentEngineReference.get() == null; + assert this.currentEngine == null; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -3286,11 +3296,12 @@ private void doCheckIndex() throws IOException { } Engine getEngine() { - Engine engine = getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine is closed"); + engineLock.readLock().lock(); + try { + return getCurrentEngine(false); + } finally { + engineLock.readLock().unlock(); } - return engine; } /** @@ -3298,7 +3309,67 @@ Engine getEngine() { * closed. */ public Engine getEngineOrNull() { - return this.currentEngineReference.get(); + engineLock.readLock().lock(); + try { + return getCurrentEngine(true); + } finally { + engineLock.readLock().unlock(); + } + } + + public R withEngine(Function operation) { + return withEngine(operation, false); + } + + public R withEngineOrNull(Function operation) { + return withEngine(operation, true); + } + + private R withEngine(Function operation, boolean allowNoEngine) { + assert operation != null; + engineLock.readLock().lock(); + var release = true; + try { + var engine = getCurrentEngine(allowNoEngine); + if (engine != null && engine.isOperable() == false) { + engineLock.readLock().unlock(); + release = false; + engineLock.writeLock().lock(); + try { + engine = getCurrentEngine(allowNoEngine); + if (engine != null && engine.isOperable() == false) { + resetEngine(); + engine = getCurrentEngine(allowNoEngine); + } + engineLock.readLock().lock(); + release = true; + } finally { + engineLock.writeLock().unlock(); + } + } + assert engine == null || engine.isOperable(); + return operation.apply(engine); + } finally { + if (release) { + engineLock.readLock().unlock(); + } + } + } + + private Engine getCurrentEngine(boolean allowNoEngine) { + assert engineLock.getReadHoldCount() > 0; + var engine = this.currentEngine; + if (engine == null && allowNoEngine == false) { + throw new AlreadyClosedException("engine is closed"); + } + return engine; + } + + private Engine getAndSetCurrentEngine(Engine newEngine) { + assert engineLock.isWriteLockedByCurrentThread(); + var previousEngine = this.currentEngine; + this.currentEngine = newEngine; + return previousEngine; } public void startRecovery( @@ -4312,12 +4383,15 @@ public void resetEngine() { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert waitForEngineOrClosedShardListeners.isDone(); try { - synchronized (engineMutex) { + engineLock.writeLock().lock(); // might already be held + try { verifyNotClosed(); getEngine().prepareForEngineReset(); var newEngine = createEngine(newEngineConfig(replicationTracker)); - IOUtils.close(currentEngineReference.getAndSet(newEngine)); + IOUtils.close(getAndSetCurrentEngine(newEngine)); onNewEngine(newEngine); + } finally { + engineLock.writeLock().unlock(); } onSettingsChanged(); } catch (Exception e) { @@ -4342,7 +4416,8 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED SetOnce newEngineReference = new SetOnce<>(); final long globalCheckpoint = getLastKnownGlobalCheckpoint(); assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { verifyNotClosed(); // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. @@ -4357,41 +4432,52 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED ) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - synchronized (engineMutex) { + engineLock.readLock().lock(); + try { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); } // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay return newEngineReference.get().acquireLastIndexCommit(false); + } finally { + engineLock.readLock().unlock(); } } @Override public IndexCommitRef acquireSafeIndexCommit() { - synchronized (engineMutex) { + engineLock.readLock().lock(); + try { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); } return newEngineReference.get().acquireSafeIndexCommit(); + } finally { + engineLock.readLock().unlock(); } } @Override public void close() throws IOException { Engine newEngine; - synchronized (engineMutex) { + engineLock.readLock().lock(); + try { newEngine = newEngineReference.get(); - if (newEngine == currentEngineReference.get()) { + if (newEngine == getCurrentEngine(true)) { // we successfully installed the new engine so do not close it. newEngine = null; } + } finally { + engineLock.readLock().unlock(); } IOUtils.close(super::close, newEngine); } }; - IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + IOUtils.close(getAndSetCurrentEngine(readOnlyEngine)); newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); + } finally { + engineLock.writeLock().unlock(); } final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, @@ -4403,12 +4489,15 @@ public void close() throws IOException { ); newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); newEngineReference.get().refresh("reset_engine"); - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { verifyNotClosed(); - IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); + IOUtils.close(getAndSetCurrentEngine(newEngineReference.get())); // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); + } finally { + engineLock.writeLock().unlock(); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. From c9185ed75c6c7d2ab18f3112acb51d180b3c3ef3 Mon Sep 17 00:00:00 2001 From: tlrx Date: Fri, 28 Feb 2025 11:45:55 +0100 Subject: [PATCH 02/14] snapshotStoreMetadata --- .../elasticsearch/index/shard/IndexShard.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ffe7a7967600a..24e4243759b12 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1615,14 +1615,21 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { Engine.IndexCommitRef indexCommit = null; store.incRef(); try { - // if the engine is not running, we can access the store directly, but we need to make sure no one starts - // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. - indexCommit = withEngineOrNull(engine -> engine != null ? engine.acquireLastIndexCommit(false) : null); - if (indexCommit != null) { - return store.getMetadata(indexCommit.getIndexCommit()); - } else { - return store.getMetadata(null, true); + engineLock.writeLock().lock(); + try { + // if the engine is not running, we can access the store directly, but we need to make sure no one starts + // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. + final Engine engine = getEngineOrNull(); + if (engine != null) { + indexCommit = engine.acquireLastIndexCommit(false); + } + if (indexCommit == null) { + return store.getMetadata(null, true); + } + } finally { + engineLock.writeLock().unlock(); } + return store.getMetadata(indexCommit.getIndexCommit()); } finally { store.decRef(); IOUtils.close(indexCommit); From 6b4de97c3340fde7a0030981c35b875021df24b6 Mon Sep 17 00:00:00 2001 From: tlrx Date: Fri, 28 Feb 2025 11:55:15 +0100 Subject: [PATCH 03/14] assert threads --- .../java/org/elasticsearch/index/shard/IndexShard.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 24e4243759b12..b66f0b3ffe261 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -45,6 +45,8 @@ import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lucene.Lucene; @@ -152,6 +154,7 @@ import org.elasticsearch.search.internal.FieldUsageTrackingDirectoryReader; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transports; import java.io.Closeable; import java.io.IOException; @@ -3333,7 +3336,11 @@ public R withEngineOrNull(Function operation) { } private R withEngine(Function operation, boolean allowNoEngine) { + assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block"); + assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block"); + assert Transports.assertNotTransportThread("IndexShard.withEngine() can block"); assert operation != null; + engineLock.readLock().lock(); var release = true; try { From 1ddac5e1a3b64473384a49de9717851ad2e3910f Mon Sep 17 00:00:00 2001 From: tlrx Date: Fri, 28 Feb 2025 13:57:45 +0100 Subject: [PATCH 04/14] reset operability --- .../org/elasticsearch/index/engine/Engine.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 17 ++++++++++------- .../index/shard/IndexShardTests.java | 12 ++++++++++-- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 451cf2b3d2bad..731dbd125149d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -2345,7 +2345,7 @@ public record FlushResult(boolean flushPerformed, long generation) { } /** - * Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine()}. + * Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine(boolean)}. * * In general, resetting the engine should be done with care, to consider any * in-progress operations and listeners (e.g., primary term and generation listeners). diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b66f0b3ffe261..19cf50cea78bf 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -246,7 +246,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm private final ReentrantReadWriteLock engineLock = new ReentrantReadWriteLock(); // lock ordering: engineLock.writeLock -> mutex - private Engine currentEngine = null; + private Engine currentEngine = null; // must be accessed while holding engineLock final EngineFactory engineFactory; private final IndexingOperationListener indexingOperationListeners; @@ -3352,7 +3352,7 @@ private R withEngine(Function operation, boolean allowNoEngine) { try { engine = getCurrentEngine(allowNoEngine); if (engine != null && engine.isOperable() == false) { - resetEngine(); + resetEngine(true); engine = getCurrentEngine(allowNoEngine); } engineLock.readLock().lock(); @@ -4393,17 +4393,20 @@ public void afterRefresh(boolean didRefresh) { * In general, resetting the engine should be done with care, to consider any in-progress operations and listeners. * At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset. */ - public void resetEngine() { + public void resetEngine(boolean operability) { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert waitForEngineOrClosedShardListeners.isDone(); try { engineLock.writeLock().lock(); // might already be held try { verifyNotClosed(); - getEngine().prepareForEngineReset(); - var newEngine = createEngine(newEngineConfig(replicationTracker)); - IOUtils.close(getAndSetCurrentEngine(newEngine)); - onNewEngine(newEngine); + if (currentEngine.isOperable() != operability) { + currentEngine.prepareForEngineReset(); + var newEngine = createEngine(newEngineConfig(replicationTracker)); + assert newEngine.isOperable() == operability : newEngine.isOperable() + " != " + operability; + IOUtils.close(getAndSetCurrentEngine(newEngine)); + onNewEngine(newEngine); + } } finally { engineLock.writeLock().unlock(); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 975565b73a0d6..12decb9c5fbe3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4577,11 +4577,19 @@ public void testResetEngineToGlobalCheckpoint() throws Exception { public void testResetEngine() throws Exception { var newEngineCreated = new CountDownLatch(2); + final AtomicBoolean shared = new AtomicBoolean(); var indexShard = newStartedShard(true, Settings.EMPTY, config -> { try { return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true) { @Override - public void prepareForEngineReset() throws IOException {} + public void prepareForEngineReset() { + shared.set(true); + } + + @Override + public boolean isOperable() { + return shared.get(); + } }; } finally { newEngineCreated.countDown(); @@ -4593,7 +4601,7 @@ public void prepareForEngineReset() throws IOException {} var onAcquired = new PlainActionFuture(); indexShard.acquireAllPrimaryOperationsPermits(onAcquired, TimeValue.timeValueMinutes(1L)); try (var permits = safeGet(onAcquired)) { - indexShard.resetEngine(); + indexShard.resetEngine(true); } safeAwait(newEngineCreated); safeAwait(newEngineNotification); From 2b2523f4bf8df3fed624ecd8dde4b0bd858453a6 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 28 Feb 2025 13:06:05 +0000 Subject: [PATCH 05/14] [CI] Auto commit changes from spotless --- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 19cf50cea78bf..a1126a7e6f953 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -4403,7 +4403,7 @@ public void resetEngine(boolean operability) { if (currentEngine.isOperable() != operability) { currentEngine.prepareForEngineReset(); var newEngine = createEngine(newEngineConfig(replicationTracker)); - assert newEngine.isOperable() == operability : newEngine.isOperable() + " != " + operability; + assert newEngine.isOperable() == operability : newEngine.isOperable() + " != " + operability; IOUtils.close(getAndSetCurrentEngine(newEngine)); onNewEngine(newEngine); } From bba90822188c2406d8529dc84c348bdcd97ddb70 Mon Sep 17 00:00:00 2001 From: tlrx Date: Fri, 28 Feb 2025 20:43:26 +0100 Subject: [PATCH 06/14] let's see if that works --- ...csearchDirectoryReaderRefreshListener.java | 58 +++++++++++++++++++ .../engine/ReaderAwareRefreshListener.java | 25 ++++++++ 2 files changed, 83 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/ElasticsearchDirectoryReaderRefreshListener.java create mode 100644 server/src/main/java/org/elasticsearch/index/engine/ReaderAwareRefreshListener.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchDirectoryReaderRefreshListener.java b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchDirectoryReaderRefreshListener.java new file mode 100644 index 0000000000000..61dae902b00f9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchDirectoryReaderRefreshListener.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.search.ReferenceManager; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public class ElasticsearchDirectoryReaderRefreshListener { + + private final ReferenceManager readerManager; + private final List listeners; + + public ElasticsearchDirectoryReaderRefreshListener( + ReferenceManager readerManager, + List listeners + ) { + this.readerManager = Objects.requireNonNull(readerManager); + this.listeners = List.copyOf(listeners); + this.readerManager.addListener(new InternalRefreshListener()); + } + + private class InternalRefreshListener implements ReferenceManager.RefreshListener { + + @Override + public void beforeRefresh() throws IOException { + for (var listener : listeners) { + listener.beforeRefresh(); + } + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + var reader = readerManager.acquire(); + try { + for (var listener : listeners) { + if (listener instanceof ReaderAwareRefreshListener l) { + l.afterRefresh(didRefresh, reader); + } else { + listener.afterRefresh(didRefresh); + } + } + } finally { + readerManager.release(reader); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReaderAwareRefreshListener.java b/server/src/main/java/org/elasticsearch/index/engine/ReaderAwareRefreshListener.java new file mode 100644 index 0000000000000..45b5f2adb9af9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/ReaderAwareRefreshListener.java @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.search.ReferenceManager; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; + +import java.io.IOException; + +public interface ReaderAwareRefreshListener extends ReferenceManager.RefreshListener { + + @Override + default void afterRefresh(boolean didRefresh) throws IOException { + } + + void afterRefresh(boolean didRefresh, ElasticsearchDirectoryReader reader) throws IOException; + +} From 45cbd16b28de140887b25566a64dc5a310946000 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 28 Feb 2025 19:52:10 +0000 Subject: [PATCH 07/14] [CI] Auto commit changes from spotless --- .../elasticsearch/index/engine/ReaderAwareRefreshListener.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReaderAwareRefreshListener.java b/server/src/main/java/org/elasticsearch/index/engine/ReaderAwareRefreshListener.java index 45b5f2adb9af9..3f99359dffb1a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReaderAwareRefreshListener.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReaderAwareRefreshListener.java @@ -17,8 +17,7 @@ public interface ReaderAwareRefreshListener extends ReferenceManager.RefreshListener { @Override - default void afterRefresh(boolean didRefresh) throws IOException { - } + default void afterRefresh(boolean didRefresh) throws IOException {} void afterRefresh(boolean didRefresh, ElasticsearchDirectoryReader reader) throws IOException; From fdce2a407394296db825cdb86ee8bb18db7a5cb1 Mon Sep 17 00:00:00 2001 From: tlrx Date: Fri, 28 Feb 2025 22:00:48 +0100 Subject: [PATCH 08/14] let's see if that works --- .../index/engine/InternalEngine.java | 12 ++-- .../elasticsearch/index/shard/IndexShard.java | 57 +++++++++---------- 2 files changed, 33 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fc7f6eab0856c..063993017688b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -304,12 +304,12 @@ public InternalEngine(EngineConfig engineConfig) { assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; // don't allow commits until we are done with recovering pendingTranslogRecovery.set(true); - for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) { - this.externalReaderManager.addListener(listener); - } - for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) { - this.internalReaderManager.addListener(listener); - } + new ElasticsearchDirectoryReaderRefreshListener(externalReaderManager, engineConfig.getExternalRefreshListener()); // add itself + // as + // listener + new ElasticsearchDirectoryReaderRefreshListener(internalReaderManager, engineConfig.getInternalRefreshListener()); // add itself + // as + // listener this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint()); this.internalReaderManager.addListener(lastRefreshedCheckpointListener); maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo())); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a1126a7e6f953..bc57eabae117a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -94,6 +94,7 @@ import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; +import org.elasticsearch.index.engine.ReaderAwareRefreshListener; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.engine.Segment; @@ -2259,6 +2260,7 @@ private boolean assertLastestCommitUserData() throws IOException { private void onNewEngine(Engine newEngine) { assert engineLock.isWriteLockedByCurrentThread(); + refreshPendingLocationListener.setTranslogLastWriteLocationSupplier(newEngine::getTranslogLastWriteLocation); refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint); refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo); @@ -4168,12 +4170,17 @@ private void setRefreshPending(Engine engine) { } private class RefreshPendingLocationListener implements ReferenceManager.RefreshListener { + Supplier supplier; Translog.Location lastWriteLocation; + public void setTranslogLastWriteLocationSupplier(Supplier translogLastWriteLocation) { + this.supplier = translogLastWriteLocation; + } + @Override public void beforeRefresh() { try { - lastWriteLocation = getEngine().getTranslogLastWriteLocation(); + lastWriteLocation = supplier.get(); } catch (AlreadyClosedException exc) { // shard is closed - no location is fine lastWriteLocation = null; @@ -4194,18 +4201,14 @@ public void afterRefresh(boolean didRefresh) { } } - private class RefreshFieldHasValueListener implements ReferenceManager.RefreshListener { + private class RefreshFieldHasValueListener implements ReaderAwareRefreshListener { @Override public void beforeRefresh() {} @Override - public void afterRefresh(boolean didRefresh) { + public void afterRefresh(boolean didRefresh, ElasticsearchDirectoryReader reader) throws IOException { if (enableFieldHasValue && (didRefresh || fieldInfos == FieldInfos.EMPTY)) { - try (Engine.Searcher hasValueSearcher = getEngine().acquireSearcher("field_has_value")) { - setFieldInfos(FieldInfos.getMergedFieldInfos(hasValueSearcher.getIndexReader())); - } catch (AlreadyClosedException ignore) { - // engine is closed - no updated FieldInfos is fine - } + setFieldInfos(FieldInfos.getMergedFieldInfos(reader.getContext().reader())); } } } @@ -4218,35 +4221,29 @@ public ShardFieldStats getShardFieldStats() { return shardFieldStats; } - private class RefreshShardFieldStatsListener implements ReferenceManager.RefreshListener { + private class RefreshShardFieldStatsListener implements ReaderAwareRefreshListener { @Override - public void beforeRefresh() { - - } + public void beforeRefresh() {} @Override - public void afterRefresh(boolean didRefresh) { + public void afterRefresh(boolean didRefresh, ElasticsearchDirectoryReader reader) throws IOException { if (shardFieldStats == null || didRefresh) { - try (var searcher = getEngine().acquireSearcher("shard_field_stats", Engine.SearcherScope.INTERNAL)) { - int numSegments = 0; - int totalFields = 0; - long usages = 0; - for (LeafReaderContext leaf : searcher.getLeafContexts()) { - numSegments++; - var fieldInfos = leaf.reader().getFieldInfos(); - totalFields += fieldInfos.size(); - if (fieldInfos instanceof FieldInfosWithUsages ft) { - if (usages != -1) { - usages += ft.getTotalUsages(); - } - } else { - usages = -1; + int numSegments = 0; + int totalFields = 0; + long usages = 0; + for (LeafReaderContext leaf : reader.getContext().leaves()) { + numSegments++; + var fieldInfos = leaf.reader().getFieldInfos(); + totalFields += fieldInfos.size(); + if (fieldInfos instanceof FieldInfosWithUsages ft) { + if (usages != -1) { + usages += ft.getTotalUsages(); } + } else { + usages = -1; } - shardFieldStats = new ShardFieldStats(numSegments, totalFields, usages); - } catch (AlreadyClosedException ignored) { - } + shardFieldStats = new ShardFieldStats(numSegments, totalFields, usages); } } } From 52b7f75b7c85a157686b0f04475b8062ae3a5c43 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 3 Mar 2025 11:44:28 +0100 Subject: [PATCH 09/14] getEngineOrNull with timeout --- .../elasticsearch/index/shard/IndexShard.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index bc57eabae117a..85b2e597daee3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2615,7 +2615,7 @@ boolean shouldRollTranslogGeneration() { } public void onSettingsChanged() { - Engine engineOrNull = getEngineOrNull(); + Engine engineOrNull = getEngineOrNull(1, TimeUnit.SECONDS); if (engineOrNull != null) { engineOrNull.onSettingsChanged(); } @@ -3329,6 +3329,22 @@ public Engine getEngineOrNull() { } } + // TODO See how usages of this method are fixable + public Engine getEngineOrNull(long timeout, TimeUnit unit) { + try { + if (engineLock.readLock().tryLock(timeout, unit)) { + try { + return getCurrentEngine(true); + } finally { + engineLock.readLock().unlock(); + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; + } + public R withEngine(Function operation) { return withEngine(operation, false); } From 93f1a873e8d28dfa65051d6f9d8940fc9e779200 Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 4 Mar 2025 10:35:09 +0100 Subject: [PATCH 10/14] small rewording --- .../elasticsearch/index/engine/Engine.java | 8 +- .../elasticsearch/index/shard/IndexShard.java | 160 +++++++++--------- .../index/shard/IndexShardTests.java | 10 +- 3 files changed, 94 insertions(+), 84 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 731dbd125149d..e16bca6250762 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -989,7 +989,7 @@ protected final void ensureOpen() { ensureOpen(null); } - public boolean isOperable() { + public boolean isMutable() { return true; } @@ -2351,7 +2351,11 @@ public record FlushResult(boolean flushPerformed, long generation) { * in-progress operations and listeners (e.g., primary term and generation listeners). * At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset. */ - public void prepareForEngineReset() throws IOException { + public void beforeReset() throws IOException { + throw new UnsupportedOperationException("does not support engine reset"); + } + + public void afterReset() throws IOException { throw new UnsupportedOperationException("does not support engine reset"); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 85b2e597daee3..ea02e82c62e6b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -562,14 +562,14 @@ public void updateShardState( && currentRouting.relocating() && replicationTracker.isRelocated() && (newRouting.relocating() == false || newRouting.equalsIgnoringMetadata(currentRouting) == false)) { - // if the shard is not in primary mode anymore (after primary relocation) we have to fail when any changes in shard - // routing occur (e.g. due to recovery failure / cancellation). The reason is that at the moment we cannot safely - // reactivate primary mode without risking two active primaries. - throw new IndexShardRelocatedException( - shardId(), - "Shard is marked as relocated, cannot safely move to state " + newRouting.state() - ); - } + // if the shard is not in primary mode anymore (after primary relocation) we have to fail when any changes in shard + // routing occur (e.g. due to recovery failure / cancellation). The reason is that at the moment we cannot safely + // reactivate primary mode without risking two active primaries. + throw new IndexShardRelocatedException( + shardId(), + "Shard is marked as relocated, cannot safely move to state " + newRouting.state() + ); + } if (newRouting.active() && state != IndexShardState.STARTED && state != IndexShardState.CLOSED) { // If cluster.no_master_block: all then we remove all shards locally whenever there's no master, but there might still be @@ -611,15 +611,15 @@ public void updateShardState( */ assert newRouting.initializing() == false : "a started primary shard should never update its term; " - + "shard " - + newRouting - + ", " - + "current term [" - + pendingPrimaryTerm - + "], " - + "new term [" - + newPrimaryTerm - + "]"; + + "shard " + + newRouting + + ", " + + "current term [" + + pendingPrimaryTerm + + "], " + + "new term [" + + newPrimaryTerm + + "]"; assert newPrimaryTerm > pendingPrimaryTerm : "primary terms can only go up; current term [" + pendingPrimaryTerm + "], new term [" + newPrimaryTerm + "]"; /* @@ -636,14 +636,14 @@ public void updateShardState( shardStateUpdated.await(); assert pendingPrimaryTerm == newPrimaryTerm : "shard term changed on primary. expected [" - + newPrimaryTerm - + "] but was [" - + pendingPrimaryTerm - + "]" - + ", current routing: " - + currentRouting - + ", new routing: " - + newRouting; + + newPrimaryTerm + + "] but was [" + + pendingPrimaryTerm + + "]" + + ", current routing: " + + currentRouting + + ", new routing: " + + newRouting; assert getOperationPrimaryTerm() == newPrimaryTerm; try { replicationTracker.activatePrimaryMode(getLocalCheckpoint()); @@ -706,7 +706,7 @@ public void onFailure(Exception e) { this.shardRouting = newRouting; assert this.shardRouting.primary() == false || this.shardRouting.started() == false || // note that we use started and not - // active to avoid relocating shards + // active to avoid relocating shards this.indexShardOperationPermits.isBlocked() || // if permits are blocked, we are still transitioning this.replicationTracker.isPrimaryMode() : "a started primary with non-pending operation term must be in primary mode " + this.shardRouting; @@ -866,7 +866,7 @@ public void onFailure(Exception e) { } } }, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by - // CancellableThreads and we want to be able to interrupt it + // CancellableThreads and we want to be able to interrupt it } } @@ -2179,9 +2179,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t updateRetentionLeasesOnReplica(loadRetentionLeases()); assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty() : "expected empty set of retention leases with recovery source [" - + recoveryState.getRecoverySource() - + "] but got " - + getRetentionLeases(); + + recoveryState.getRecoverySource() + + "] but got " + + getRetentionLeases(); engineLock.writeLock().lock(); try { assert this.currentEngine == null : "engine is running"; @@ -2232,10 +2232,10 @@ private boolean assertLastestCommitUserData() throws IOException { assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid"; assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid [" - + userData.get(Engine.HISTORY_UUID_KEY) - + "] is different than engine [" - + getHistoryUUID() - + "]"; + + userData.get(Engine.HISTORY_UUID_KEY) + + "] is different than engine [" + + getHistoryUUID() + + "]"; assert userData.containsKey(Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : "opening index which was created post 5.5.0 but " + Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + " is not found in commit"; @@ -2248,13 +2248,13 @@ private boolean assertLastestCommitUserData() throws IOException { final org.apache.lucene.util.Version commitLuceneVersion = segmentCommitInfos.getCommitLuceneVersion(); assert commitLuceneVersion.onOrAfter(RecoverySettings.SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION.luceneVersion()) == false || userData.containsKey(Engine.ES_VERSION) - && Engine.readIndexVersion(userData.get(Engine.ES_VERSION)).onOrBefore(IndexVersion.current()) + && Engine.readIndexVersion(userData.get(Engine.ES_VERSION)).onOrBefore(IndexVersion.current()) : "commit point has an invalid ES_VERSION value. commit point lucene version [" - + commitLuceneVersion - + "]," - + " ES_VERSION [" - + userData.get(Engine.ES_VERSION) - + "]"; + + commitLuceneVersion + + "]," + + " ES_VERSION [" + + userData.get(Engine.ES_VERSION) + + "]"; return true; } @@ -2615,7 +2615,7 @@ boolean shouldRollTranslogGeneration() { } public void onSettingsChanged() { - Engine engineOrNull = getEngineOrNull(1, TimeUnit.SECONDS); + Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null) { engineOrNull.onSettingsChanged(); } @@ -3096,11 +3096,11 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S */ assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED : "supposedly in-sync shard copy received a global checkpoint [" - + globalCheckpoint - + "] " - + "that is higher than its local checkpoint [" - + localCheckpoint - + "]"; + + globalCheckpoint + + "] " + + "that is higher than its local checkpoint [" + + localCheckpoint + + "]"; return; } replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, reason); @@ -3329,31 +3329,32 @@ public Engine getEngineOrNull() { } } - // TODO See how usages of this method are fixable - public Engine getEngineOrNull(long timeout, TimeUnit unit) { - try { - if (engineLock.readLock().tryLock(timeout, unit)) { - try { - return getCurrentEngine(true); - } finally { - engineLock.readLock().unlock(); - } - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return null; + public void withMutableEngine(Function operation) { + withEngine(operation, true, false); } - public R withEngine(Function operation) { - return withEngine(operation, false); + public void withMutableEngineOrNull(Function operation) { + withEngine(operation, true, true); } - public R withEngineOrNull(Function operation) { - return withEngine(operation, true); + public void withImmutableEngine(Function operation) { + withEngine(operation, false, false); } - private R withEngine(Function operation, boolean allowNoEngine) { + /** + * Executes an operation while preventing the shard's engine instance to be changed or closed during the execution. The parameter + * {@code requiredMutability} can be used to force the engine to be reset to a given mutable or immutable state before executing the + * operation. The parameter {@code allowNoEngine} is used to allow the operation to be executed with a null engine instance, in which + * case the {@code requiredMutability} is ignored. When {@code allowNoEngine} is set to {@code `false`} the method will throw an + * {@link AlreadyClosedException} if the current engine is null and won't reset the engine to the required mutability state. + * + * @param operation + * @param requiredMutability + * @param allowNoEngine + * @return + * @param + */ + private R withEngine(Function operation, boolean requiredMutability, boolean allowNoEngine) { assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block"); assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block"); assert Transports.assertNotTransportThread("IndexShard.withEngine() can block"); @@ -3363,14 +3364,14 @@ private R withEngine(Function operation, boolean allowNoEngine) { var release = true; try { var engine = getCurrentEngine(allowNoEngine); - if (engine != null && engine.isOperable() == false) { + if (engine != null && (engine.isMutable() == requiredMutability) == false) { engineLock.readLock().unlock(); release = false; engineLock.writeLock().lock(); try { engine = getCurrentEngine(allowNoEngine); - if (engine != null && engine.isOperable() == false) { - resetEngine(true); + if (engine != null && (engine.isMutable() == requiredMutability) == false) { + resetEngine(requiredMutability); engine = getCurrentEngine(allowNoEngine); } engineLock.readLock().lock(); @@ -3379,7 +3380,7 @@ private R withEngine(Function operation, boolean allowNoEngine) { engineLock.writeLock().unlock(); } } - assert engine == null || engine.isOperable(); + assert engine == null || engine.isMutable() == requiredMutability; return operation.apply(engine); } finally { if (release) { @@ -3389,7 +3390,7 @@ private R withEngine(Function operation, boolean allowNoEngine) { } private Engine getCurrentEngine(boolean allowNoEngine) { - assert engineLock.getReadHoldCount() > 0; + assert engineLock.getReadHoldCount() > 0 || engineLock.isWriteLockedByCurrentThread(); var engine = this.currentEngine; if (engine == null && allowNoEngine == false) { throw new AlreadyClosedException("engine is closed"); @@ -4387,10 +4388,10 @@ public void afterRefresh(boolean didRefresh) { assert callingThread != null : "afterRefresh called but not beforeRefresh"; assert callingThread == Thread.currentThread() : "beforeRefreshed called by a different thread. current [" - + Thread.currentThread().getName() - + "], thread that called beforeRefresh [" - + callingThread.getName() - + "]"; + + Thread.currentThread().getName() + + "], thread that called beforeRefresh [" + + callingThread.getName() + + "]"; callingThread = null; } refreshMetric.inc(System.nanoTime() - currentRefreshStartTime); @@ -4400,25 +4401,26 @@ public void afterRefresh(boolean didRefresh) { /** * Reset the current engine to a new one. * - * Calls {@link Engine#prepareForEngineReset()} on the current engine, then closes it, and loads a new engine without + * Calls {@link Engine#beforeReset()} on the current engine, then closes it, and loads a new engine without * doing any translog recovery. * * In general, resetting the engine should be done with care, to consider any in-progress operations and listeners. * At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset. */ - public void resetEngine(boolean operability) { + private void resetEngine(boolean mutability) { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert waitForEngineOrClosedShardListeners.isDone(); try { engineLock.writeLock().lock(); // might already be held try { verifyNotClosed(); - if (currentEngine.isOperable() != operability) { - currentEngine.prepareForEngineReset(); + if (currentEngine.isMutable() != mutability) { + currentEngine.beforeReset(); var newEngine = createEngine(newEngineConfig(replicationTracker)); - assert newEngine.isOperable() == operability : newEngine.isOperable() + " != " + operability; + assert newEngine.isMutable() == mutability : newEngine.isMutable() + " != " + mutability; IOUtils.close(getAndSetCurrentEngine(newEngine)); onNewEngine(newEngine); + currentEngine.afterReset(); } } finally { engineLock.writeLock().unlock(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 12decb9c5fbe3..f8ae60eb324ef 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4582,14 +4582,18 @@ public void testResetEngine() throws Exception { try { return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true) { @Override - public void prepareForEngineReset() { + public void beforeReset() { shared.set(true); } @Override - public boolean isOperable() { + public boolean isMutable() { return shared.get(); } + + @Override + public void afterReset() throws IOException { + } }; } finally { newEngineCreated.countDown(); @@ -4601,7 +4605,7 @@ public boolean isOperable() { var onAcquired = new PlainActionFuture(); indexShard.acquireAllPrimaryOperationsPermits(onAcquired, TimeValue.timeValueMinutes(1L)); try (var permits = safeGet(onAcquired)) { - indexShard.resetEngine(true); + indexShard.withMutableEngine(ignored -> ignored); } safeAwait(newEngineCreated); safeAwait(newEngineNotification); From 5eadc811c27b1ab4ebfcd1c5198e2960a83b101e Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 4 Mar 2025 09:42:09 +0000 Subject: [PATCH 11/14] [CI] Auto commit changes from spotless --- .../elasticsearch/index/shard/IndexShard.java | 96 +++++++++---------- .../index/shard/IndexShardTests.java | 3 +- 2 files changed, 49 insertions(+), 50 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ea02e82c62e6b..8602cbceddbea 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -562,14 +562,14 @@ public void updateShardState( && currentRouting.relocating() && replicationTracker.isRelocated() && (newRouting.relocating() == false || newRouting.equalsIgnoringMetadata(currentRouting) == false)) { - // if the shard is not in primary mode anymore (after primary relocation) we have to fail when any changes in shard - // routing occur (e.g. due to recovery failure / cancellation). The reason is that at the moment we cannot safely - // reactivate primary mode without risking two active primaries. - throw new IndexShardRelocatedException( - shardId(), - "Shard is marked as relocated, cannot safely move to state " + newRouting.state() - ); - } + // if the shard is not in primary mode anymore (after primary relocation) we have to fail when any changes in shard + // routing occur (e.g. due to recovery failure / cancellation). The reason is that at the moment we cannot safely + // reactivate primary mode without risking two active primaries. + throw new IndexShardRelocatedException( + shardId(), + "Shard is marked as relocated, cannot safely move to state " + newRouting.state() + ); + } if (newRouting.active() && state != IndexShardState.STARTED && state != IndexShardState.CLOSED) { // If cluster.no_master_block: all then we remove all shards locally whenever there's no master, but there might still be @@ -611,15 +611,15 @@ public void updateShardState( */ assert newRouting.initializing() == false : "a started primary shard should never update its term; " - + "shard " - + newRouting - + ", " - + "current term [" - + pendingPrimaryTerm - + "], " - + "new term [" - + newPrimaryTerm - + "]"; + + "shard " + + newRouting + + ", " + + "current term [" + + pendingPrimaryTerm + + "], " + + "new term [" + + newPrimaryTerm + + "]"; assert newPrimaryTerm > pendingPrimaryTerm : "primary terms can only go up; current term [" + pendingPrimaryTerm + "], new term [" + newPrimaryTerm + "]"; /* @@ -636,14 +636,14 @@ public void updateShardState( shardStateUpdated.await(); assert pendingPrimaryTerm == newPrimaryTerm : "shard term changed on primary. expected [" - + newPrimaryTerm - + "] but was [" - + pendingPrimaryTerm - + "]" - + ", current routing: " - + currentRouting - + ", new routing: " - + newRouting; + + newPrimaryTerm + + "] but was [" + + pendingPrimaryTerm + + "]" + + ", current routing: " + + currentRouting + + ", new routing: " + + newRouting; assert getOperationPrimaryTerm() == newPrimaryTerm; try { replicationTracker.activatePrimaryMode(getLocalCheckpoint()); @@ -706,7 +706,7 @@ public void onFailure(Exception e) { this.shardRouting = newRouting; assert this.shardRouting.primary() == false || this.shardRouting.started() == false || // note that we use started and not - // active to avoid relocating shards + // active to avoid relocating shards this.indexShardOperationPermits.isBlocked() || // if permits are blocked, we are still transitioning this.replicationTracker.isPrimaryMode() : "a started primary with non-pending operation term must be in primary mode " + this.shardRouting; @@ -2179,9 +2179,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t updateRetentionLeasesOnReplica(loadRetentionLeases()); assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty() : "expected empty set of retention leases with recovery source [" - + recoveryState.getRecoverySource() - + "] but got " - + getRetentionLeases(); + + recoveryState.getRecoverySource() + + "] but got " + + getRetentionLeases(); engineLock.writeLock().lock(); try { assert this.currentEngine == null : "engine is running"; @@ -2232,10 +2232,10 @@ private boolean assertLastestCommitUserData() throws IOException { assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid"; assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid [" - + userData.get(Engine.HISTORY_UUID_KEY) - + "] is different than engine [" - + getHistoryUUID() - + "]"; + + userData.get(Engine.HISTORY_UUID_KEY) + + "] is different than engine [" + + getHistoryUUID() + + "]"; assert userData.containsKey(Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : "opening index which was created post 5.5.0 but " + Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + " is not found in commit"; @@ -2248,13 +2248,13 @@ private boolean assertLastestCommitUserData() throws IOException { final org.apache.lucene.util.Version commitLuceneVersion = segmentCommitInfos.getCommitLuceneVersion(); assert commitLuceneVersion.onOrAfter(RecoverySettings.SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION.luceneVersion()) == false || userData.containsKey(Engine.ES_VERSION) - && Engine.readIndexVersion(userData.get(Engine.ES_VERSION)).onOrBefore(IndexVersion.current()) + && Engine.readIndexVersion(userData.get(Engine.ES_VERSION)).onOrBefore(IndexVersion.current()) : "commit point has an invalid ES_VERSION value. commit point lucene version [" - + commitLuceneVersion - + "]," - + " ES_VERSION [" - + userData.get(Engine.ES_VERSION) - + "]"; + + commitLuceneVersion + + "]," + + " ES_VERSION [" + + userData.get(Engine.ES_VERSION) + + "]"; return true; } @@ -3096,11 +3096,11 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S */ assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED : "supposedly in-sync shard copy received a global checkpoint [" - + globalCheckpoint - + "] " - + "that is higher than its local checkpoint [" - + localCheckpoint - + "]"; + + globalCheckpoint + + "] " + + "that is higher than its local checkpoint [" + + localCheckpoint + + "]"; return; } replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, reason); @@ -4388,10 +4388,10 @@ public void afterRefresh(boolean didRefresh) { assert callingThread != null : "afterRefresh called but not beforeRefresh"; assert callingThread == Thread.currentThread() : "beforeRefreshed called by a different thread. current [" - + Thread.currentThread().getName() - + "], thread that called beforeRefresh [" - + callingThread.getName() - + "]"; + + Thread.currentThread().getName() + + "], thread that called beforeRefresh [" + + callingThread.getName() + + "]"; callingThread = null; } refreshMetric.inc(System.nanoTime() - currentRefreshStartTime); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f8ae60eb324ef..9fc22addb89fc 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4592,8 +4592,7 @@ public boolean isMutable() { } @Override - public void afterReset() throws IOException { - } + public void afterReset() throws IOException {} }; } finally { newEngineCreated.countDown(); From 134f01cf8a705fed4e4f4fd86061578d8ecc4c70 Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 4 Mar 2025 14:05:44 +0100 Subject: [PATCH 12/14] onSettingsChanged --- .../elasticsearch/index/shard/IndexShard.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8602cbceddbea..7670220e967b0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2615,9 +2615,20 @@ boolean shouldRollTranslogGeneration() { } public void onSettingsChanged() { - Engine engineOrNull = getEngineOrNull(); - if (engineOrNull != null) { - engineOrNull.onSettingsChanged(); + // This method can be called within the cluster state applier thread + if (engineLock.readLock().tryLock() == false) { + // Attempt to acquire a read lock failed: + // - the engine is closing, in which case we don't need to apply the updated index settings + // - otherwise the onSettingsChanged() should be called again after the new engine is created and the write lock is released + return; + } + try { + var engineOrNull = getCurrentEngine(true); + if (engineOrNull != null) { + engineOrNull.onSettingsChanged(); + } + } finally { + engineLock.readLock().unlock(); } } From 660121326bafbeab72e34c9b86a068733574cb7c Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 4 Mar 2025 16:21:10 +0100 Subject: [PATCH 13/14] gets --- .../action/get/TransportGetAction.java | 14 ++---------- .../get/TransportShardMultiGetAction.java | 14 ++---------- .../elasticsearch/index/shard/IndexShard.java | 22 ++++++++++--------- 3 files changed, 16 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index d474eee8375b3..dd0771b0dab1a 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -237,10 +236,7 @@ private void getFromTranslog( final var retryingListener = listener.delegateResponse((l, e) -> { final var cause = ExceptionsHelper.unwrapCause(e); logger.debug("get_from_translog failed", cause); - if (cause instanceof ShardNotFoundException - || cause instanceof IndexNotFoundException - || cause instanceof AlreadyClosedException) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 + if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) { logger.debug("retrying get_from_translog"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -255,13 +251,7 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 - if (cause instanceof AlreadyClosedException) { - // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update - tryGetFromTranslog(request, indexShard, node, l); - } else { - l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause)); - } + l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause)); } }); } else { diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 9002f7eb6d053..12752a896837a 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -218,10 +217,7 @@ private void shardMultiGetFromTranslog( final var retryingListener = listener.delegateResponse((l, e) -> { final var cause = ExceptionsHelper.unwrapCause(e); logger.debug("mget_from_translog[shard] failed", cause); - if (cause instanceof ShardNotFoundException - || cause instanceof IndexNotFoundException - || cause instanceof AlreadyClosedException) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 + if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) { logger.debug("retrying mget_from_translog[shard]"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -236,13 +232,7 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 - if (cause instanceof AlreadyClosedException) { - // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update - tryShardMultiGetFromTranslog(request, indexShard, node, l); - } else { - l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause)); - } + l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause)); } }); } else { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7670220e967b0..84df22924ee92 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1283,10 +1283,12 @@ private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly, Function if (indexSettings.getIndexVersionCreated().isLegacyIndexVersion()) { throw new IllegalStateException("get operations not allowed on a legacy index"); } - if (translogOnly) { - return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper); - } - return getEngine().get(get, mappingLookup, mapperService.documentParser(), searcherWrapper); + return withMutableEngine(engine -> { + if (translogOnly) { + return engine.getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper); + } + return engine.get(get, mappingLookup, mapperService.documentParser(), searcherWrapper); + }); } /** @@ -3340,16 +3342,16 @@ public Engine getEngineOrNull() { } } - public void withMutableEngine(Function operation) { - withEngine(operation, true, false); + public R withMutableEngine(Function operation) { + return withEngine(operation, true, false); } - public void withMutableEngineOrNull(Function operation) { - withEngine(operation, true, true); + public R withMutableEngineOrNull(Function operation) { + return withEngine(operation, true, true); } - public void withImmutableEngine(Function operation) { - withEngine(operation, false, false); + public R withImmutableEngine(Function operation) { + return withEngine(operation, false, false); } /** From 0ba34f4ef00f9edc08b48de689224e7ce340daa5 Mon Sep 17 00:00:00 2001 From: tlrx Date: Wed, 5 Mar 2025 10:23:14 +0100 Subject: [PATCH 14/14] use ensureMutable in GetFromTranslog --- .../get/TransportGetFromTranslogAction.java | 56 ++++++----- ...ansportShardMultiGetFomTranslogAction.java | 96 ++++++++++--------- 2 files changed, 82 insertions(+), 70 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java index 6fc1ff5300101..36f011e2ed2cc 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java @@ -16,15 +16,16 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.get.GetResult; @@ -49,42 +50,47 @@ public class TransportGetFromTranslogAction extends HandledTransportAction< public static final Logger logger = LogManager.getLogger(TransportGetFromTranslogAction.class); private final IndicesService indicesService; + private final ThreadPool threadPool; @Inject public TransportGetFromTranslogAction(TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) { super(NAME, transportService, actionFilters, Request::new, transportService.getThreadPool().executor(ThreadPool.Names.GET)); this.indicesService = indicesService; + this.threadPool = transportService.getThreadPool(); } @Override protected void doExecute(Task task, Request request, ActionListener listener) { final GetRequest getRequest = request.getRequest(); - final ShardId shardId = request.shardId(); - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - IndexShard indexShard = indexService.getShard(shardId.id()); - assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); assert getRequest.realtime(); - ActionListener.completeWith(listener, () -> { - var result = indexShard.getService() - .getFromTranslog( - getRequest.id(), - getRequest.storedFields(), - getRequest.realtime(), - getRequest.version(), - getRequest.versionType(), - getRequest.fetchSourceContext(), - getRequest.isForceSyntheticSource() - ); - long segmentGeneration = -1; - if (result == null) { - Engine engine = indexShard.getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine closed"); + + SubscribableListener.newForked(l -> { + var indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); + assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); + indexShard.ensureMutable(l.map(unused -> indexShard)); + }).andThen((l, indexShard) -> { + threadPool.executor(ThreadPool.Names.GET).execute(ActionRunnable.supply(l, () -> { + var result = indexShard.getService() + .getFromTranslog( + getRequest.id(), + getRequest.storedFields(), + getRequest.realtime(), + getRequest.version(), + getRequest.versionType(), + getRequest.fetchSourceContext(), + getRequest.isForceSyntheticSource() + ); + long segmentGeneration = -1; + if (result == null) { + Engine engine = indexShard.getEngineOrNull(); + if (engine == null) { + throw new AlreadyClosedException("engine closed"); + } + segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); } - segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); - } - return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); - }); + return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); + })); + }).addListener(listener); } public static class Request extends ActionRequest implements IndicesRequest { diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java index ec0b5c6cf143f..50fe35f20342b 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java @@ -15,13 +15,14 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -43,6 +44,7 @@ public class TransportShardMultiGetFomTranslogAction extends HandledTransportAct public static final Logger logger = LogManager.getLogger(TransportShardMultiGetFomTranslogAction.class); private final IndicesService indicesService; + private final ThreadPool threadPool; protected TransportShardMultiGetFomTranslogAction( TransportService transportService, @@ -51,60 +53,64 @@ protected TransportShardMultiGetFomTranslogAction( ) { super(NAME, transportService, actionFilters, Request::new, transportService.getThreadPool().executor(ThreadPool.Names.GET)); this.indicesService = indicesService; + this.threadPool = transportService.getThreadPool(); } @Override protected void doExecute(Task task, Request request, ActionListener listener) { var multiGetShardRequest = request.getMultiGetShardRequest(); - var shardId = request.getShardId(); - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - IndexShard indexShard = indexService.getShard(shardId.id()); - assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); assert multiGetShardRequest.realtime(); - ActionListener.completeWith(listener, () -> { - var multiGetShardResponse = new MultiGetShardResponse(); - var someItemsNotFoundInTranslog = false; - for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { - var item = multiGetShardRequest.items.get(i); - try { - var result = indexShard.getService() - .getFromTranslog( - item.id(), - item.storedFields(), - multiGetShardRequest.realtime(), - item.version(), - item.versionType(), - item.fetchSourceContext(), - multiGetShardRequest.isForceSyntheticSource() + + SubscribableListener.newForked(l -> { + var indexShard = indicesService.indexServiceSafe(request.getShardId().getIndex()).getShard(request.getShardId().id()); + assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); + indexShard.ensureMutable(l.map(unused -> indexShard)); + }).andThen((l, indexShard) -> { + threadPool.executor(ThreadPool.Names.GET).execute(ActionRunnable.supply(l, () -> { + var multiGetShardResponse = new MultiGetShardResponse(); + var someItemsNotFoundInTranslog = false; + for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { + var item = multiGetShardRequest.items.get(i); + try { + var result = indexShard.getService() + .getFromTranslog( + item.id(), + item.storedFields(), + multiGetShardRequest.realtime(), + item.version(), + item.versionType(), + item.fetchSourceContext(), + multiGetShardRequest.isForceSyntheticSource() + ); + GetResponse getResponse = null; + if (result == null) { + someItemsNotFoundInTranslog = true; + } else { + getResponse = new GetResponse(result); + } + multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); + } catch (RuntimeException | IOException e) { + if (TransportActions.isShardNotAvailableException(e)) { + throw e; + } + logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", request.getShardId(), item.id(), e); + multiGetShardResponse.add( + multiGetShardRequest.locations.get(i), + new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) ); - GetResponse getResponse = null; - if (result == null) { - someItemsNotFoundInTranslog = true; - } else { - getResponse = new GetResponse(result); - } - multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); - } catch (RuntimeException | IOException e) { - if (TransportActions.isShardNotAvailableException(e)) { - throw e; } - logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e); - multiGetShardResponse.add( - multiGetShardRequest.locations.get(i), - new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) - ); } - } - long segmentGeneration = -1; - if (someItemsNotFoundInTranslog) { - Engine engine = indexShard.getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine closed"); + long segmentGeneration = -1; + if (someItemsNotFoundInTranslog) { + Engine engine = indexShard.getEngineOrNull(); + if (engine == null) { + throw new AlreadyClosedException("engine closed"); + } + segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); } - segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); - } - return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); - }); + return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); + })); + }).addListener(listener); } public static class Request extends ActionRequest {