From 3aa270a36e5823f94605e177483859f49e34d6d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Wed, 26 Mar 2025 09:52:58 +0100 Subject: [PATCH 1/2] Guard Get operations against Engine resets Relates #124635 Closes ES-11324 --- .../action/get/TransportGetAction.java | 14 ++------------ .../action/get/TransportGetFromTranslogAction.java | 7 +------ .../action/get/TransportShardMultiGetAction.java | 14 ++------------ .../TransportShardMultiGetFomTranslogAction.java | 7 +------ .../org/elasticsearch/index/shard/IndexShard.java | 9 +++------ 5 files changed, 9 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 57bc18ee7d62b..ac43209b0f906 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -235,10 +234,7 @@ private void getFromTranslog( final var retryingListener = listener.delegateResponse((l, e) -> { final var cause = ExceptionsHelper.unwrapCause(e); logger.debug("get_from_translog failed", cause); - if (cause instanceof ShardNotFoundException - || cause instanceof IndexNotFoundException - || cause instanceof AlreadyClosedException) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 + if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) { logger.debug("retrying get_from_translog"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -253,13 +249,7 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 - if (cause instanceof AlreadyClosedException) { - // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update - tryGetFromTranslog(request, indexShard, node, l); - } else { - l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause)); - } + l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause)); } }); } else { diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java index 5689c194f82d7..c787388fc7b57 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java @@ -9,7 +9,6 @@ package org.elasticsearch.action.get; -import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.BytesRef; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; @@ -77,11 +76,7 @@ protected void doExecute(Task task, Request request, ActionListener li ); long segmentGeneration = -1; if (result == null) { - Engine engine = indexShard.getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine closed"); - } - segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); + segmentGeneration = indexShard.withEngine(Engine::getLastUnsafeSegmentGenerationForGets); } return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); }); diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 14678d65d32f1..10a798fcd22fd 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -216,10 +215,7 @@ private void shardMultiGetFromTranslog( final var retryingListener = listener.delegateResponse((l, e) -> { final var cause = ExceptionsHelper.unwrapCause(e); logger.debug("mget_from_translog[shard] failed", cause); - if (cause instanceof ShardNotFoundException - || cause instanceof IndexNotFoundException - || cause instanceof AlreadyClosedException) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 + if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) { logger.debug("retrying mget_from_translog[shard]"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -234,13 +230,7 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - // TODO AlreadyClosedException the engine reset should be fixed by ES-10826 - if (cause instanceof AlreadyClosedException) { - // Do an additional retry just in case AlreadyClosedException didn't generate a cluster update - tryShardMultiGetFromTranslog(request, indexShard, node, l); - } else { - l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause)); - } + l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause)); } }); } else { diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java index d0d119e4438f9..b0de7484ce7c4 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java @@ -9,7 +9,6 @@ package org.elasticsearch.action.get; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -97,11 +96,7 @@ protected void doExecute(Task task, Request request, ActionListener li } long segmentGeneration = -1; if (someItemsNotFoundInTranslog) { - Engine engine = indexShard.getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine closed"); - } - segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); + segmentGeneration = indexShard.withEngine(Engine::getLastUnsafeSegmentGenerationForGets); } return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); }); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4bff703d10e49..34bd94e89926a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1296,9 +1296,9 @@ private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly, Function throw new IllegalStateException("get operations not allowed on a legacy index"); } if (translogOnly) { - return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper); + return withEngine(engine -> engine.getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper)); } - return getEngine().get(get, mappingLookup, mapperService.documentParser(), searcherWrapper); + return withEngine(engine -> engine.get(get, mappingLookup, mapperService.documentParser(), searcherWrapper)); } /** @@ -4698,9 +4698,6 @@ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGenera * @param listener the listener to be notified when the shard is mutable */ public void ensureMutable(ActionListener listener) { - indexEventListener.beforeIndexShardMutableOperation(this, listener.delegateFailure((l, unused) -> { - // TODO ES-10826: Acquire ref to engine and retry if it's immutable again? - l.onResponse(null); - })); + indexEventListener.beforeIndexShardMutableOperation(this, listener.delegateFailure((l, unused) -> l.onResponse(null))); } } From 6e1255f9b6ac85a97bddf34c7c05c5e98c4ab3e4 Mon Sep 17 00:00:00 2001 From: tlrx Date: Thu, 10 Apr 2025 15:35:42 +0200 Subject: [PATCH 2/2] update + few changes --- .../action/get/TransportGetAction.java | 8 +- .../get/TransportGetFromTranslogAction.java | 33 +++++---- .../get/TransportShardMultiGetAction.java | 8 +- ...ansportShardMultiGetFomTranslogAction.java | 74 ++++++++++--------- .../elasticsearch/index/shard/IndexShard.java | 37 +++++++++- 5 files changed, 105 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index ac43209b0f906..71798755d7351 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -39,6 +40,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -234,7 +236,11 @@ private void getFromTranslog( final var retryingListener = listener.delegateResponse((l, e) -> { final var cause = ExceptionsHelper.unwrapCause(e); logger.debug("get_from_translog failed", cause); - if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) { + // All of the following exceptions can be thrown if the shard is relocated + if (cause instanceof ShardNotFoundException + || cause instanceof IndexNotFoundException + || cause instanceof IllegalIndexShardStateException + || cause instanceof AlreadyClosedException) { logger.debug("retrying get_from_translog"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java index c787388fc7b57..c4c7d8e5e7c7f 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java @@ -64,21 +64,24 @@ protected void doExecute(Task task, Request request, ActionListener li assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); assert getRequest.realtime(); ActionListener.completeWith(listener, () -> { - var result = indexShard.getService() - .getFromTranslog( - getRequest.id(), - getRequest.storedFields(), - getRequest.realtime(), - getRequest.version(), - getRequest.versionType(), - getRequest.fetchSourceContext(), - getRequest.isForceSyntheticSource() - ); - long segmentGeneration = -1; - if (result == null) { - segmentGeneration = indexShard.withEngine(Engine::getLastUnsafeSegmentGenerationForGets); - } - return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); + // Allows to keep the same engine instance for getFromTranslog and getLastUnsafeSegmentGenerationForGets + return indexShard.withEngineException(engine -> { + var result = indexShard.getService() + .getFromTranslog( + getRequest.id(), + getRequest.storedFields(), + getRequest.realtime(), + getRequest.version(), + getRequest.versionType(), + getRequest.fetchSourceContext(), + getRequest.isForceSyntheticSource() + ); + long segmentGeneration = -1; + if (result == null) { + segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); + } + return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration); + }); }); } diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 10a798fcd22fd..806b55e6ad7c9 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -39,6 +40,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.MultiEngineGet; import org.elasticsearch.index.shard.ShardId; @@ -215,7 +217,11 @@ private void shardMultiGetFromTranslog( final var retryingListener = listener.delegateResponse((l, e) -> { final var cause = ExceptionsHelper.unwrapCause(e); logger.debug("mget_from_translog[shard] failed", cause); - if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) { + // All of the following exceptions can be thrown if the shard is relocated + if (cause instanceof ShardNotFoundException + || cause instanceof IndexNotFoundException + || cause instanceof IllegalIndexShardStateException + || cause instanceof AlreadyClosedException) { logger.debug("retrying mget_from_translog[shard]"); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java index b0de7484ce7c4..46893c9240a4e 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java @@ -61,44 +61,48 @@ protected void doExecute(Task task, Request request, ActionListener li assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); assert multiGetShardRequest.realtime(); ActionListener.completeWith(listener, () -> { - var multiGetShardResponse = new MultiGetShardResponse(); - var someItemsNotFoundInTranslog = false; - for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { - var item = multiGetShardRequest.items.get(i); - try { - var result = indexShard.getService() - .getFromTranslog( - item.id(), - item.storedFields(), - multiGetShardRequest.realtime(), - item.version(), - item.versionType(), - item.fetchSourceContext(), - multiGetShardRequest.isForceSyntheticSource() + // Allows to keep the same engine instance for getFromTranslog and getLastUnsafeSegmentGenerationForGets + return indexShard.withEngineException(engine -> { + var multiGetShardResponse = new MultiGetShardResponse(); + var someItemsNotFoundInTranslog = false; + + for (int i = 0; i < multiGetShardRequest.locations.size(); i++) { + var item = multiGetShardRequest.items.get(i); + try { + var result = indexShard.getService() + .getFromTranslog( + item.id(), + item.storedFields(), + multiGetShardRequest.realtime(), + item.version(), + item.versionType(), + item.fetchSourceContext(), + multiGetShardRequest.isForceSyntheticSource() + ); + GetResponse getResponse = null; + if (result == null) { + someItemsNotFoundInTranslog = true; + } else { + getResponse = new GetResponse(result); + } + multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); + } catch (RuntimeException | IOException e) { + if (TransportActions.isShardNotAvailableException(e)) { + throw e; + } + logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e); + multiGetShardResponse.add( + multiGetShardRequest.locations.get(i), + new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) ); - GetResponse getResponse = null; - if (result == null) { - someItemsNotFoundInTranslog = true; - } else { - getResponse = new GetResponse(result); } - multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse); - } catch (RuntimeException | IOException e) { - if (TransportActions.isShardNotAvailableException(e)) { - throw e; - } - logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e); - multiGetShardResponse.add( - multiGetShardRequest.locations.get(i), - new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e) - ); } - } - long segmentGeneration = -1; - if (someItemsNotFoundInTranslog) { - segmentGeneration = indexShard.withEngine(Engine::getLastUnsafeSegmentGenerationForGets); - } - return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); + long segmentGeneration = -1; + if (someItemsNotFoundInTranslog) { + segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets(); + } + return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration); + }); }); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b1f16033a53fd..41499532cb40a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3399,6 +3399,31 @@ public R withEngine(Function operation) { return withEngine(operation, false); } + /** + * Executes an operation (potentially throwing a checked exception) while preventing the shard's engine instance to be reset during the + * execution. + * If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The + * engine might be closed while the operation is executed. + * + * @param operation the operation to execute + * @return the result of the operation + * @param the type of the result + * @param the type of checked exception that the operation can potentially throws. + * @throws AlreadyClosedException if the current engine instance is {@code null}. + */ + public R withEngineException(CheckedFunction operation) throws E { + assert assertCurrentThreadWithEngine(); + assert operation != null; + + engineResetLock.readLock().lock(); + try { + var engine = getCurrentEngine(false); + return operation.apply(engine); + } finally { + engineResetLock.readLock().unlock(); + } + } + /** * Executes an operation while preventing the shard's engine instance to be reset during the execution * (see {@link #resetEngine(Consumer)}. @@ -3413,9 +3438,7 @@ public R withEngine(Function operation) { * @param the type of the result */ private R withEngine(Function operation, boolean allowNoEngine) { - assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block"); - assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block"); - assert Transports.assertNotTransportThread("IndexShard.withEngine() can block"); + assert assertCurrentThreadWithEngine(); assert operation != null; engineResetLock.readLock().lock(); @@ -3427,6 +3450,14 @@ private R withEngine(Function operation, boolean allowNoEngine) { } } + private static boolean assertCurrentThreadWithEngine() { + var message = "method IndexShard#withEngine (or one of its variant) can block"; + assert ClusterApplierService.assertNotClusterStateUpdateThread(message); + assert MasterService.assertNotMasterUpdateThread(message); + assert Transports.assertNotTransportThread(message); + return true; + } + public void startRecovery( RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,