diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index d474eee8375b3..dd0771b0dab1a 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -237,10 +236,7 @@ private void getFromTranslog( final var retryingListener = listener.delegateResponse((l, e) -> { final var cause = ExceptionsHelper.unwrapCause(e); logger.debug("get_from_translog failed", cause); - if (cause instanceof ShardNotFoundException - || cause instanceof IndexNotFoundException - || cause instanceof AlreadyClosedException) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 + if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) { logger.debug("retrying get_from_translog"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -255,13 +251,7 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 - if (cause instanceof AlreadyClosedException) { - // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update - tryGetFromTranslog(request, indexShard, node, l); - } else { - l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause)); - } + l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause)); } }); } else { diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java index 6fc1ff5300101..36f011e2ed2cc 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java @@ -16,15 +16,16 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.get.GetResult; @@ -49,42 +50,47 @@ public class TransportGetFromTranslogAction extends HandledTransportAction< public static final Logger logger = LogManager.getLogger(TransportGetFromTranslogAction.class); private final IndicesService indicesService; + private final ThreadPool threadPool; @Inject public TransportGetFromTranslogAction(TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) { super(NAME, transportService, actionFilters, Request::new, transportService.getThreadPool().executor(ThreadPool.Names.GET)); this.indicesService = indicesService; + this.threadPool = transportService.getThreadPool(); } @Override protected void doExecute(Task task, Request request, ActionListener listener) { final GetRequest getRequest = request.getRequest(); - final ShardId shardId = request.shardId(); - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - IndexShard indexShard = indexService.getShard(shardId.id()); - assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); assert getRequest.realtime(); - ActionListener.completeWith(listener, () -> { - var result = indexShard.getService() - .getFromTranslog( - getRequest.id(), - getRequest.storedFields(), - getRequest.realtime(), - getRequest.version(), - getRequest.versionType(), - getRequest.fetchSourceContext(), - getRequest.isForceSyntheticSource() - ); - long segmentGeneration = -1; - if (result == null) { - Engine engine = indexShard.getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine closed"); + + SubscribableListener.newForked(l -> { + var indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); + assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); + indexShard.ensureMutable(l.map(unused -> indexShard)); + }).andThen((l, indexShard) -> { + threadPool.executor(ThreadPool.Names.GET).execute(ActionRunnable.supply(l, () -> { + var result = indexShard.getService() + .getFromTranslog( + getRequest.id(), + getRequest.storedFields(), + getRequest.realtime(), + getRequest.version(), + getRequest.versionType(), + getRequest.fetchSourceContext(), + getRequest.isForceSyntheticSource() + ); + long segmentGeneration = -1; + if (result == null) { + Engine engine = indexShard.getEngineOrNull(); + if (engine == null) { + throw new AlreadyClosedException("engine closed"); + } + segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); } - segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); - } - return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); - }); + return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); + })); + }).addListener(listener); } public static class Request extends ActionRequest implements IndicesRequest { diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 9002f7eb6d053..12752a896837a 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -218,10 +217,7 @@ private void shardMultiGetFromTranslog( final var retryingListener = listener.delegateResponse((l, e) -> { final var cause = ExceptionsHelper.unwrapCause(e); logger.debug("mget_from_translog[shard] failed", cause); - if (cause instanceof ShardNotFoundException - || cause instanceof IndexNotFoundException - || cause instanceof AlreadyClosedException) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 + if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) { logger.debug("retrying mget_from_translog[shard]"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -236,13 +232,7 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 - if (cause instanceof AlreadyClosedException) { - // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update - tryShardMultiGetFromTranslog(request, indexShard, node, l); - } else { - l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause)); - } + l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause)); } }); } else { diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java index ec0b5c6cf143f..50fe35f20342b 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java @@ -15,13 +15,14 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -43,6 +44,7 @@ public class TransportShardMultiGetFomTranslogAction extends HandledTransportAct public static final Logger logger = LogManager.getLogger(TransportShardMultiGetFomTranslogAction.class); private final IndicesService indicesService; + private final ThreadPool threadPool; protected TransportShardMultiGetFomTranslogAction( TransportService transportService, @@ -51,60 +53,64 @@ protected TransportShardMultiGetFomTranslogAction( ) { super(NAME, transportService, actionFilters, Request::new, transportService.getThreadPool().executor(ThreadPool.Names.GET)); this.indicesService = indicesService; + this.threadPool = transportService.getThreadPool(); } @Override protected void doExecute(Task task, Request request, ActionListener listener) { var multiGetShardRequest = request.getMultiGetShardRequest(); - var shardId = request.getShardId(); - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - IndexShard indexShard = indexService.getShard(shardId.id()); - assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); assert multiGetShardRequest.realtime(); - ActionListener.completeWith(listener, () -> { - var multiGetShardResponse = new MultiGetShardResponse(); - var someItemsNotFoundInTranslog = false; - for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { - var item = multiGetShardRequest.items.get(i); - try { - var result = indexShard.getService() - .getFromTranslog( - item.id(), - item.storedFields(), - multiGetShardRequest.realtime(), - item.version(), - item.versionType(), - item.fetchSourceContext(), - multiGetShardRequest.isForceSyntheticSource() + + SubscribableListener.newForked(l -> { + var indexShard = indicesService.indexServiceSafe(request.getShardId().getIndex()).getShard(request.getShardId().id()); + assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); + indexShard.ensureMutable(l.map(unused -> indexShard)); + }).andThen((l, indexShard) -> { + threadPool.executor(ThreadPool.Names.GET).execute(ActionRunnable.supply(l, () -> { + var multiGetShardResponse = new MultiGetShardResponse(); + var someItemsNotFoundInTranslog = false; + for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { + var item = multiGetShardRequest.items.get(i); + try { + var result = indexShard.getService() + .getFromTranslog( + item.id(), + item.storedFields(), + multiGetShardRequest.realtime(), + item.version(), + item.versionType(), + item.fetchSourceContext(), + multiGetShardRequest.isForceSyntheticSource() + ); + GetResponse getResponse = null; + if (result == null) { + someItemsNotFoundInTranslog = true; + } else { + getResponse = new GetResponse(result); + } + multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); + } catch (RuntimeException | IOException e) { + if (TransportActions.isShardNotAvailableException(e)) { + throw e; + } + logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", request.getShardId(), item.id(), e); + multiGetShardResponse.add( + multiGetShardRequest.locations.get(i), + new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) ); - GetResponse getResponse = null; - if (result == null) { - someItemsNotFoundInTranslog = true; - } else { - getResponse = new GetResponse(result); - } - multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); - } catch (RuntimeException | IOException e) { - if (TransportActions.isShardNotAvailableException(e)) { - throw e; } - logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e); - multiGetShardResponse.add( - multiGetShardRequest.locations.get(i), - new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) - ); } - } - long segmentGeneration = -1; - if (someItemsNotFoundInTranslog) { - Engine engine = indexShard.getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine closed"); + long segmentGeneration = -1; + if (someItemsNotFoundInTranslog) { + Engine engine = indexShard.getEngineOrNull(); + if (engine == null) { + throw new AlreadyClosedException("engine closed"); + } + segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); } - segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); - } - return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); - }); + return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); + })); + }).addListener(listener); } public static class Request extends ActionRequest { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchDirectoryReaderRefreshListener.java b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchDirectoryReaderRefreshListener.java new file mode 100644 index 0000000000000..61dae902b00f9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchDirectoryReaderRefreshListener.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.search.ReferenceManager; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public class ElasticsearchDirectoryReaderRefreshListener { + + private final ReferenceManager readerManager; + private final List listeners; + + public ElasticsearchDirectoryReaderRefreshListener( + ReferenceManager readerManager, + List listeners + ) { + this.readerManager = Objects.requireNonNull(readerManager); + this.listeners = List.copyOf(listeners); + this.readerManager.addListener(new InternalRefreshListener()); + } + + private class InternalRefreshListener implements ReferenceManager.RefreshListener { + + @Override + public void beforeRefresh() throws IOException { + for (var listener : listeners) { + listener.beforeRefresh(); + } + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + var reader = readerManager.acquire(); + try { + for (var listener : listeners) { + if (listener instanceof ReaderAwareRefreshListener l) { + l.afterRefresh(didRefresh, reader); + } else { + listener.afterRefresh(didRefresh); + } + } + } finally { + readerManager.release(reader); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 3298d8757ca92..e16bca6250762 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -989,6 +989,10 @@ protected final void ensureOpen() { ensureOpen(null); } + public boolean isMutable() { + return true; + } + /** get commits stats for the last commit */ public final CommitStats commitStats() { return new CommitStats(getLastCommittedSegmentInfos()); @@ -2341,13 +2345,17 @@ public record FlushResult(boolean flushPerformed, long generation) { } /** - * Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine()}. + * Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine(boolean)}. * * In general, resetting the engine should be done with care, to consider any * in-progress operations and listeners (e.g., primary term and generation listeners). * At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset. */ - public void prepareForEngineReset() throws IOException { + public void beforeReset() throws IOException { + throw new UnsupportedOperationException("does not support engine reset"); + } + + public void afterReset() throws IOException { throw new UnsupportedOperationException("does not support engine reset"); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fc7f6eab0856c..063993017688b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -304,12 +304,12 @@ public InternalEngine(EngineConfig engineConfig) { assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; // don't allow commits until we are done with recovering pendingTranslogRecovery.set(true); - for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) { - this.externalReaderManager.addListener(listener); - } - for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) { - this.internalReaderManager.addListener(listener); - } + new ElasticsearchDirectoryReaderRefreshListener(externalReaderManager, engineConfig.getExternalRefreshListener()); // add itself + // as + // listener + new ElasticsearchDirectoryReaderRefreshListener(internalReaderManager, engineConfig.getInternalRefreshListener()); // add itself + // as + // listener this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint()); this.internalReaderManager.addListener(lastRefreshedCheckpointListener); maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo())); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReaderAwareRefreshListener.java b/server/src/main/java/org/elasticsearch/index/engine/ReaderAwareRefreshListener.java new file mode 100644 index 0000000000000..3f99359dffb1a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/ReaderAwareRefreshListener.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.search.ReferenceManager; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; + +import java.io.IOException; + +public interface ReaderAwareRefreshListener extends ReferenceManager.RefreshListener { + + @Override + default void afterRefresh(boolean didRefresh) throws IOException {} + + void afterRefresh(boolean didRefresh, ElasticsearchDirectoryReader reader) throws IOException; + +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7fecc53826ff1..84df22924ee92 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -45,6 +45,8 @@ import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lucene.Lucene; @@ -92,6 +94,7 @@ import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; +import org.elasticsearch.index.engine.ReaderAwareRefreshListener; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.engine.Segment; @@ -152,6 +155,7 @@ import org.elasticsearch.search.internal.FieldUsageTrackingDirectoryReader; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transports; import java.io.Closeable; import java.io.IOException; @@ -178,6 +182,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -240,8 +245,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // ensure happens-before relation between addRefreshListener() and postRecovery() private volatile SubscribableListener postRecoveryComplete; private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex - private final AtomicReference currentEngineReference = new AtomicReference<>(); + + private final ReentrantReadWriteLock engineLock = new ReentrantReadWriteLock(); // lock ordering: engineLock.writeLock -> mutex + private Engine currentEngine = null; // must be accessed while holding engineLock final EngineFactory engineFactory; private final IndexingOperationListener indexingOperationListeners; @@ -700,7 +706,7 @@ public void onFailure(Exception e) { this.shardRouting = newRouting; assert this.shardRouting.primary() == false || this.shardRouting.started() == false || // note that we use started and not - // active to avoid relocating shards + // active to avoid relocating shards this.indexShardOperationPermits.isBlocked() || // if permits are blocked, we are still transitioning this.replicationTracker.isPrimaryMode() : "a started primary with non-pending operation term must be in primary mode " + this.shardRouting; @@ -860,7 +866,7 @@ public void onFailure(Exception e) { } } }, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by - // CancellableThreads and we want to be able to interrupt it + // CancellableThreads and we want to be able to interrupt it } } @@ -1277,10 +1283,12 @@ private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly, Function if (indexSettings.getIndexVersionCreated().isLegacyIndexVersion()) { throw new IllegalStateException("get operations not allowed on a legacy index"); } - if (translogOnly) { - return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper); - } - return getEngine().get(get, mappingLookup, mapperService.documentParser(), searcherWrapper); + return withMutableEngine(engine -> { + if (translogOnly) { + return engine.getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper); + } + return engine.get(get, mappingLookup, mapperService.documentParser(), searcherWrapper); + }); } /** @@ -1613,7 +1621,8 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { Engine.IndexCommitRef indexCommit = null; store.incRef(); try { - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { // if the engine is not running, we can access the store directly, but we need to make sure no one starts // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. final Engine engine = getEngineOrNull(); @@ -1623,6 +1632,8 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { if (indexCommit == null) { return store.getMetadata(null, true); } + } finally { + engineLock.writeLock().unlock(); } return store.getMetadata(indexCommit.getIndexCommit()); } finally { @@ -1776,14 +1787,15 @@ public CacheHelper getReaderCacheHelper() { } public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener closeListener) throws IOException { - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { + synchronized (mutex) { + changeState(IndexShardState.CLOSED, reason); + } + checkAndCallWaitForEngineOrClosedShardListeners(); + } finally { try { - synchronized (mutex) { - changeState(IndexShardState.CLOSED, reason); - } - checkAndCallWaitForEngineOrClosedShardListeners(); - } finally { - final Engine engine = this.currentEngineReference.getAndSet(null); + final Engine engine = getAndSetCurrentEngine(null); closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() { @Override public void run() throws Exception { @@ -1809,6 +1821,8 @@ public String toString() { return "IndexShard#close[" + shardId + "]"; } })); + } finally { + engineLock.writeLock().unlock(); } } } @@ -1857,7 +1871,7 @@ public void prepareForIndexRecovery() { throw new IndexShardNotRecoveringException(shardId, state); } recoveryState.setStage(RecoveryState.Stage.INDEX); - assert currentEngineReference.get() == null; + assert this.currentEngine == null; } /** @@ -1936,8 +1950,11 @@ private void doLocalRecovery( // First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again. .newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; - synchronized (engineMutex) { - IOUtils.close(currentEngineReference.getAndSet(null)); + engineLock.writeLock().lock(); + try { + IOUtils.close(getAndSetCurrentEngine(null)); + } finally { + engineLock.writeLock().unlock(); } }, (recoveryCompleteListener, ignoredRef) -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; @@ -2167,16 +2184,19 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); - synchronized (engineMutex) { - assert currentEngineReference.get() == null : "engine is running"; + engineLock.writeLock().lock(); + try { + assert this.currentEngine == null : "engine is running"; verifyNotClosed(); // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = createEngine(config); onNewEngine(newEngine); - currentEngineReference.set(newEngine); + getAndSetCurrentEngine(newEngine); // We set active because we are now writing operations to the engine; this way, // we can flush if we go idle after some time and become inactive. active.set(true); + } finally { + engineLock.writeLock().unlock(); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. @@ -2241,7 +2261,8 @@ private boolean assertLastestCommitUserData() throws IOException { } private void onNewEngine(Engine newEngine) { - assert Thread.holdsLock(engineMutex); + assert engineLock.isWriteLockedByCurrentThread(); + refreshPendingLocationListener.setTranslogLastWriteLocationSupplier(newEngine::getTranslogLastWriteLocation); refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint); refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo); @@ -2252,10 +2273,13 @@ private void onNewEngine(Engine newEngine) { */ public void performRecoveryRestart() throws IOException { assert Thread.holdsLock(mutex) == false : "restart recovery under mutex"; - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - IOUtils.close(currentEngineReference.getAndSet(null)); + IOUtils.close(getAndSetCurrentEngine(null)); resetRecoveryStage(); + } finally { + engineLock.writeLock().unlock(); } } @@ -2264,7 +2288,7 @@ public void performRecoveryRestart() throws IOException { */ public void resetRecoveryStage() { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; - assert currentEngineReference.get() == null; + assert this.currentEngine == null; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -2593,9 +2617,20 @@ boolean shouldRollTranslogGeneration() { } public void onSettingsChanged() { - Engine engineOrNull = getEngineOrNull(); - if (engineOrNull != null) { - engineOrNull.onSettingsChanged(); + // This method can be called within the cluster state applier thread + if (engineLock.readLock().tryLock() == false) { + // Attempt to acquire a read lock failed: + // - the engine is closing, in which case we don't need to apply the updated index settings + // - otherwise the onSettingsChanged() should be called again after the new engine is created and the write lock is released + return; + } + try { + var engineOrNull = getCurrentEngine(true); + if (engineOrNull != null) { + engineOrNull.onSettingsChanged(); + } + } finally { + engineLock.readLock().unlock(); } } @@ -3286,11 +3321,12 @@ private void doCheckIndex() throws IOException { } Engine getEngine() { - Engine engine = getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine is closed"); + engineLock.readLock().lock(); + try { + return getCurrentEngine(false); + } finally { + engineLock.readLock().unlock(); } - return engine; } /** @@ -3298,7 +3334,88 @@ Engine getEngine() { * closed. */ public Engine getEngineOrNull() { - return this.currentEngineReference.get(); + engineLock.readLock().lock(); + try { + return getCurrentEngine(true); + } finally { + engineLock.readLock().unlock(); + } + } + + public R withMutableEngine(Function operation) { + return withEngine(operation, true, false); + } + + public R withMutableEngineOrNull(Function operation) { + return withEngine(operation, true, true); + } + + public R withImmutableEngine(Function operation) { + return withEngine(operation, false, false); + } + + /** + * Executes an operation while preventing the shard's engine instance to be changed or closed during the execution. The parameter + * {@code requiredMutability} can be used to force the engine to be reset to a given mutable or immutable state before executing the + * operation. The parameter {@code allowNoEngine} is used to allow the operation to be executed with a null engine instance, in which + * case the {@code requiredMutability} is ignored. When {@code allowNoEngine} is set to {@code `false`} the method will throw an + * {@link AlreadyClosedException} if the current engine is null and won't reset the engine to the required mutability state. + * + * @param operation + * @param requiredMutability + * @param allowNoEngine + * @return + * @param + */ + private R withEngine(Function operation, boolean requiredMutability, boolean allowNoEngine) { + assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block"); + assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block"); + assert Transports.assertNotTransportThread("IndexShard.withEngine() can block"); + assert operation != null; + + engineLock.readLock().lock(); + var release = true; + try { + var engine = getCurrentEngine(allowNoEngine); + if (engine != null && (engine.isMutable() == requiredMutability) == false) { + engineLock.readLock().unlock(); + release = false; + engineLock.writeLock().lock(); + try { + engine = getCurrentEngine(allowNoEngine); + if (engine != null && (engine.isMutable() == requiredMutability) == false) { + resetEngine(requiredMutability); + engine = getCurrentEngine(allowNoEngine); + } + engineLock.readLock().lock(); + release = true; + } finally { + engineLock.writeLock().unlock(); + } + } + assert engine == null || engine.isMutable() == requiredMutability; + return operation.apply(engine); + } finally { + if (release) { + engineLock.readLock().unlock(); + } + } + } + + private Engine getCurrentEngine(boolean allowNoEngine) { + assert engineLock.getReadHoldCount() > 0 || engineLock.isWriteLockedByCurrentThread(); + var engine = this.currentEngine; + if (engine == null && allowNoEngine == false) { + throw new AlreadyClosedException("engine is closed"); + } + return engine; + } + + private Engine getAndSetCurrentEngine(Engine newEngine) { + assert engineLock.isWriteLockedByCurrentThread(); + var previousEngine = this.currentEngine; + this.currentEngine = newEngine; + return previousEngine; } public void startRecovery( @@ -4083,12 +4200,17 @@ private void setRefreshPending(Engine engine) { } private class RefreshPendingLocationListener implements ReferenceManager.RefreshListener { + Supplier supplier; Translog.Location lastWriteLocation; + public void setTranslogLastWriteLocationSupplier(Supplier translogLastWriteLocation) { + this.supplier = translogLastWriteLocation; + } + @Override public void beforeRefresh() { try { - lastWriteLocation = getEngine().getTranslogLastWriteLocation(); + lastWriteLocation = supplier.get(); } catch (AlreadyClosedException exc) { // shard is closed - no location is fine lastWriteLocation = null; @@ -4109,18 +4231,14 @@ public void afterRefresh(boolean didRefresh) { } } - private class RefreshFieldHasValueListener implements ReferenceManager.RefreshListener { + private class RefreshFieldHasValueListener implements ReaderAwareRefreshListener { @Override public void beforeRefresh() {} @Override - public void afterRefresh(boolean didRefresh) { + public void afterRefresh(boolean didRefresh, ElasticsearchDirectoryReader reader) throws IOException { if (enableFieldHasValue && (didRefresh || fieldInfos == FieldInfos.EMPTY)) { - try (Engine.Searcher hasValueSearcher = getEngine().acquireSearcher("field_has_value")) { - setFieldInfos(FieldInfos.getMergedFieldInfos(hasValueSearcher.getIndexReader())); - } catch (AlreadyClosedException ignore) { - // engine is closed - no updated FieldInfos is fine - } + setFieldInfos(FieldInfos.getMergedFieldInfos(reader.getContext().reader())); } } } @@ -4133,35 +4251,29 @@ public ShardFieldStats getShardFieldStats() { return shardFieldStats; } - private class RefreshShardFieldStatsListener implements ReferenceManager.RefreshListener { + private class RefreshShardFieldStatsListener implements ReaderAwareRefreshListener { @Override - public void beforeRefresh() { - - } + public void beforeRefresh() {} @Override - public void afterRefresh(boolean didRefresh) { + public void afterRefresh(boolean didRefresh, ElasticsearchDirectoryReader reader) throws IOException { if (shardFieldStats == null || didRefresh) { - try (var searcher = getEngine().acquireSearcher("shard_field_stats", Engine.SearcherScope.INTERNAL)) { - int numSegments = 0; - int totalFields = 0; - long usages = 0; - for (LeafReaderContext leaf : searcher.getLeafContexts()) { - numSegments++; - var fieldInfos = leaf.reader().getFieldInfos(); - totalFields += fieldInfos.size(); - if (fieldInfos instanceof FieldInfosWithUsages ft) { - if (usages != -1) { - usages += ft.getTotalUsages(); - } - } else { - usages = -1; + int numSegments = 0; + int totalFields = 0; + long usages = 0; + for (LeafReaderContext leaf : reader.getContext().leaves()) { + numSegments++; + var fieldInfos = leaf.reader().getFieldInfos(); + totalFields += fieldInfos.size(); + if (fieldInfos instanceof FieldInfosWithUsages ft) { + if (usages != -1) { + usages += ft.getTotalUsages(); } + } else { + usages = -1; } - shardFieldStats = new ShardFieldStats(numSegments, totalFields, usages); - } catch (AlreadyClosedException ignored) { - } + shardFieldStats = new ShardFieldStats(numSegments, totalFields, usages); } } } @@ -4302,22 +4414,29 @@ public void afterRefresh(boolean didRefresh) { /** * Reset the current engine to a new one. * - * Calls {@link Engine#prepareForEngineReset()} on the current engine, then closes it, and loads a new engine without + * Calls {@link Engine#beforeReset()} on the current engine, then closes it, and loads a new engine without * doing any translog recovery. * * In general, resetting the engine should be done with care, to consider any in-progress operations and listeners. * At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset. */ - public void resetEngine() { + private void resetEngine(boolean mutability) { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert waitForEngineOrClosedShardListeners.isDone(); try { - synchronized (engineMutex) { + engineLock.writeLock().lock(); // might already be held + try { verifyNotClosed(); - getEngine().prepareForEngineReset(); - var newEngine = createEngine(newEngineConfig(replicationTracker)); - IOUtils.close(currentEngineReference.getAndSet(newEngine)); - onNewEngine(newEngine); + if (currentEngine.isMutable() != mutability) { + currentEngine.beforeReset(); + var newEngine = createEngine(newEngineConfig(replicationTracker)); + assert newEngine.isMutable() == mutability : newEngine.isMutable() + " != " + mutability; + IOUtils.close(getAndSetCurrentEngine(newEngine)); + onNewEngine(newEngine); + currentEngine.afterReset(); + } + } finally { + engineLock.writeLock().unlock(); } onSettingsChanged(); } catch (Exception e) { @@ -4342,7 +4461,8 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED SetOnce newEngineReference = new SetOnce<>(); final long globalCheckpoint = getLastKnownGlobalCheckpoint(); assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { verifyNotClosed(); // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. @@ -4357,41 +4477,52 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED ) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - synchronized (engineMutex) { + engineLock.readLock().lock(); + try { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); } // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay return newEngineReference.get().acquireLastIndexCommit(false); + } finally { + engineLock.readLock().unlock(); } } @Override public IndexCommitRef acquireSafeIndexCommit() { - synchronized (engineMutex) { + engineLock.readLock().lock(); + try { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); } return newEngineReference.get().acquireSafeIndexCommit(); + } finally { + engineLock.readLock().unlock(); } } @Override public void close() throws IOException { Engine newEngine; - synchronized (engineMutex) { + engineLock.readLock().lock(); + try { newEngine = newEngineReference.get(); - if (newEngine == currentEngineReference.get()) { + if (newEngine == getCurrentEngine(true)) { // we successfully installed the new engine so do not close it. newEngine = null; } + } finally { + engineLock.readLock().unlock(); } IOUtils.close(super::close, newEngine); } }; - IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + IOUtils.close(getAndSetCurrentEngine(readOnlyEngine)); newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); + } finally { + engineLock.writeLock().unlock(); } final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, @@ -4403,12 +4534,15 @@ public void close() throws IOException { ); newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); newEngineReference.get().refresh("reset_engine"); - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { verifyNotClosed(); - IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); + IOUtils.close(getAndSetCurrentEngine(newEngineReference.get())); // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); + } finally { + engineLock.writeLock().unlock(); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 975565b73a0d6..9fc22addb89fc 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4577,11 +4577,22 @@ public void testResetEngineToGlobalCheckpoint() throws Exception { public void testResetEngine() throws Exception { var newEngineCreated = new CountDownLatch(2); + final AtomicBoolean shared = new AtomicBoolean(); var indexShard = newStartedShard(true, Settings.EMPTY, config -> { try { return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true) { @Override - public void prepareForEngineReset() throws IOException {} + public void beforeReset() { + shared.set(true); + } + + @Override + public boolean isMutable() { + return shared.get(); + } + + @Override + public void afterReset() throws IOException {} }; } finally { newEngineCreated.countDown(); @@ -4593,7 +4604,7 @@ public void prepareForEngineReset() throws IOException {} var onAcquired = new PlainActionFuture(); indexShard.acquireAllPrimaryOperationsPermits(onAcquired, TimeValue.timeValueMinutes(1L)); try (var permits = safeGet(onAcquired)) { - indexShard.resetEngine(); + indexShard.withMutableEngine(ignored -> ignored); } safeAwait(newEngineCreated); safeAwait(newEngineNotification);