diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 57bc18ee7d62b..71798755d7351 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -235,10 +236,11 @@ private void getFromTranslog( final var retryingListener = listener.delegateResponse((l, e) -> { final var cause = ExceptionsHelper.unwrapCause(e); logger.debug("get_from_translog failed", cause); + // All of the following exceptions can be thrown if the shard is relocated if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException + || cause instanceof IllegalIndexShardStateException || cause instanceof AlreadyClosedException) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 logger.debug("retrying get_from_translog"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -253,13 +255,7 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 - if (cause instanceof AlreadyClosedException) { - // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update - tryGetFromTranslog(request, indexShard, node, l); - } else { - l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause)); - } + l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause)); } }); } else { diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java index 5689c194f82d7..c4c7d8e5e7c7f 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java @@ -9,7 +9,6 @@ package org.elasticsearch.action.get; -import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.BytesRef; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; @@ -65,25 +64,24 @@ protected void doExecute(Task task, Request request, ActionListener li assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); assert getRequest.realtime(); ActionListener.completeWith(listener, () -> { - var result = indexShard.getService() - .getFromTranslog( - getRequest.id(), - getRequest.storedFields(), - getRequest.realtime(), - getRequest.version(), - getRequest.versionType(), - getRequest.fetchSourceContext(), - getRequest.isForceSyntheticSource() - ); - long segmentGeneration = -1; - if (result == null) { - Engine engine = indexShard.getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine closed"); + // Allows to keep the same engine instance for getFromTranslog and getLastUnsafeSegmentGenerationForGets + return indexShard.withEngineException(engine -> { + var result = indexShard.getService() + .getFromTranslog( + getRequest.id(), + getRequest.storedFields(), + getRequest.realtime(), + getRequest.version(), + getRequest.versionType(), + getRequest.fetchSourceContext(), + getRequest.isForceSyntheticSource() + ); + long segmentGeneration = -1; + if (result == null) { + segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); } - segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); - } - return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); + return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); + }); }); } diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 14678d65d32f1..806b55e6ad7c9 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.MultiEngineGet; import org.elasticsearch.index.shard.ShardId; @@ -216,10 +217,11 @@ private void shardMultiGetFromTranslog( final var retryingListener = listener.delegateResponse((l, e) -> { final var cause = ExceptionsHelper.unwrapCause(e); logger.debug("mget_from_translog[shard] failed", cause); + // All of the following exceptions can be thrown if the shard is relocated if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException + || cause instanceof IllegalIndexShardStateException || cause instanceof AlreadyClosedException) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 logger.debug("retrying mget_from_translog[shard]"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -234,13 +236,7 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 - if (cause instanceof AlreadyClosedException) { - // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update - tryShardMultiGetFromTranslog(request, indexShard, node, l); - } else { - l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause)); - } + l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause)); } }); } else { diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java index d0d119e4438f9..46893c9240a4e 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java @@ -9,7 +9,6 @@ package org.elasticsearch.action.get; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -62,48 +61,48 @@ protected void doExecute(Task task, Request request, ActionListener li assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); assert multiGetShardRequest.realtime(); ActionListener.completeWith(listener, () -> { - var multiGetShardResponse = new MultiGetShardResponse(); - var someItemsNotFoundInTranslog = false; - for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { - var item = multiGetShardRequest.items.get(i); - try { - var result = indexShard.getService() - .getFromTranslog( - item.id(), - item.storedFields(), - multiGetShardRequest.realtime(), - item.version(), - item.versionType(), - item.fetchSourceContext(), - multiGetShardRequest.isForceSyntheticSource() + // Allows to keep the same engine instance for getFromTranslog and getLastUnsafeSegmentGenerationForGets + return indexShard.withEngineException(engine -> { + var multiGetShardResponse = new MultiGetShardResponse(); + var someItemsNotFoundInTranslog = false; + + for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { + var item = multiGetShardRequest.items.get(i); + try { + var result = indexShard.getService() + .getFromTranslog( + item.id(), + item.storedFields(), + multiGetShardRequest.realtime(), + item.version(), + item.versionType(), + item.fetchSourceContext(), + multiGetShardRequest.isForceSyntheticSource() + ); + GetResponse getResponse = null; + if (result == null) { + someItemsNotFoundInTranslog = true; + } else { + getResponse = new GetResponse(result); + } + multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); + } catch (RuntimeException | IOException e) { + if (TransportActions.isShardNotAvailableException(e)) { + throw e; + } + logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e); + multiGetShardResponse.add( + multiGetShardRequest.locations.get(i), + new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) ); - GetResponse getResponse = null; - if (result == null) { - someItemsNotFoundInTranslog = true; - } else { - getResponse = new GetResponse(result); } - multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); - } catch (RuntimeException | IOException e) { - if (TransportActions.isShardNotAvailableException(e)) { - throw e; - } - logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e); - multiGetShardResponse.add( - multiGetShardRequest.locations.get(i), - new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) - ); } - } - long segmentGeneration = -1; - if (someItemsNotFoundInTranslog) { - Engine engine = indexShard.getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine closed"); + long segmentGeneration = -1; + if (someItemsNotFoundInTranslog) { + segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); } - segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); - } - return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); + return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); + }); }); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8dd2def14572e..41499532cb40a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1310,9 +1310,9 @@ private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly, Function throw new IllegalStateException("get operations not allowed on a legacy index"); } if (translogOnly) { - return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper); + return withEngine(engine -> engine.getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper)); } - return getEngine().get(get, mappingLookup, mapperService.documentParser(), searcherWrapper); + return withEngine(engine -> engine.get(get, mappingLookup, mapperService.documentParser(), searcherWrapper)); } /** @@ -3399,6 +3399,31 @@ public R withEngine(Function operation) { return withEngine(operation, false); } + /** + * Executes an operation (potentially throwing a checked exception) while preventing the shard's engine instance to be reset during the + * execution. + * If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The + * engine might be closed while the operation is executed. + * + * @param operation the operation to execute + * @return the result of the operation + * @param the type of the result + * @param the type of checked exception that the operation can potentially throws. + * @throws AlreadyClosedException if the current engine instance is {@code null}. + */ + public R withEngineException(CheckedFunction operation) throws E { + assert assertCurrentThreadWithEngine(); + assert operation != null; + + engineResetLock.readLock().lock(); + try { + var engine = getCurrentEngine(false); + return operation.apply(engine); + } finally { + engineResetLock.readLock().unlock(); + } + } + /** * Executes an operation while preventing the shard's engine instance to be reset during the execution * (see {@link #resetEngine(Consumer)}. @@ -3413,9 +3438,7 @@ public R withEngine(Function operation) { * @param the type of the result */ private R withEngine(Function operation, boolean allowNoEngine) { - assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block"); - assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block"); - assert Transports.assertNotTransportThread("IndexShard.withEngine() can block"); + assert assertCurrentThreadWithEngine(); assert operation != null; engineResetLock.readLock().lock(); @@ -3427,6 +3450,14 @@ private R withEngine(Function operation, boolean allowNoEngine) { } } + private static boolean assertCurrentThreadWithEngine() { + var message = "method IndexShard#withEngine (or one of its variant) can block"; + assert ClusterApplierService.assertNotClusterStateUpdateThread(message); + assert MasterService.assertNotMasterUpdateThread(message); + assert Transports.assertNotTransportThread(message); + return true; + } + public void startRecovery( RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, @@ -4664,10 +4695,11 @@ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGenera * @param listener the listener to be notified when the shard is mutable */ public void ensureMutable(ActionListener listener, boolean permitAcquired) { - indexEventListener.beforeIndexShardMutableOperation(this, permitAcquired, listener.delegateFailure((l, unused) -> { - // TODO ES-10826: Acquire ref to engine and retry if it's immutable again? - l.onResponse(null); - })); + indexEventListener.beforeIndexShardMutableOperation( + this, + permitAcquired, + listener.delegateFailure((l, unused) -> l.onResponse(null)) + ); } // package-private for tests