Skip to content

Commit 3aa270a

Browse files
committed
Guard Get operations against Engine resets
Relates elastic#124635 Closes ES-11324
1 parent 16441fe commit 3aa270a

File tree

5 files changed

+9
-42
lines changed

5 files changed

+9
-42
lines changed

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14-
import org.apache.lucene.store.AlreadyClosedException;
1514
import org.elasticsearch.ElasticsearchException;
1615
import org.elasticsearch.ExceptionsHelper;
1716
import org.elasticsearch.action.ActionListener;
@@ -235,10 +234,7 @@ private void getFromTranslog(
235234
final var retryingListener = listener.delegateResponse((l, e) -> {
236235
final var cause = ExceptionsHelper.unwrapCause(e);
237236
logger.debug("get_from_translog failed", cause);
238-
if (cause instanceof ShardNotFoundException
239-
|| cause instanceof IndexNotFoundException
240-
|| cause instanceof AlreadyClosedException) {
241-
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
237+
if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) {
242238
logger.debug("retrying get_from_translog");
243239
observer.waitForNextChange(new ClusterStateObserver.Listener() {
244240
@Override
@@ -253,13 +249,7 @@ public void onClusterServiceClose() {
253249

254250
@Override
255251
public void onTimeout(TimeValue timeout) {
256-
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
257-
if (cause instanceof AlreadyClosedException) {
258-
// Do an additional retry just in case AlreadyClosedException didn't generate a cluster update
259-
tryGetFromTranslog(request, indexShard, node, l);
260-
} else {
261-
l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause));
262-
}
252+
l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause));
263253
}
264254
});
265255
} else {

server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.action.get;
1111

12-
import org.apache.lucene.store.AlreadyClosedException;
1312
import org.apache.lucene.util.BytesRef;
1413
import org.elasticsearch.TransportVersions;
1514
import org.elasticsearch.action.ActionListener;
@@ -77,11 +76,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
7776
);
7877
long segmentGeneration = -1;
7978
if (result == null) {
80-
Engine engine = indexShard.getEngineOrNull();
81-
if (engine == null) {
82-
throw new AlreadyClosedException("engine closed");
83-
}
84-
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
79+
segmentGeneration = indexShard.withEngine(Engine::getLastUnsafeSegmentGenerationForGets);
8580
}
8681
return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration);
8782
});

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14-
import org.apache.lucene.store.AlreadyClosedException;
1514
import org.elasticsearch.ElasticsearchException;
1615
import org.elasticsearch.ExceptionsHelper;
1716
import org.elasticsearch.action.ActionListener;
@@ -216,10 +215,7 @@ private void shardMultiGetFromTranslog(
216215
final var retryingListener = listener.delegateResponse((l, e) -> {
217216
final var cause = ExceptionsHelper.unwrapCause(e);
218217
logger.debug("mget_from_translog[shard] failed", cause);
219-
if (cause instanceof ShardNotFoundException
220-
|| cause instanceof IndexNotFoundException
221-
|| cause instanceof AlreadyClosedException) {
222-
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
218+
if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) {
223219
logger.debug("retrying mget_from_translog[shard]");
224220
observer.waitForNextChange(new ClusterStateObserver.Listener() {
225221
@Override
@@ -234,13 +230,7 @@ public void onClusterServiceClose() {
234230

235231
@Override
236232
public void onTimeout(TimeValue timeout) {
237-
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
238-
if (cause instanceof AlreadyClosedException) {
239-
// Do an additional retry just in case AlreadyClosedException didn't generate a cluster update
240-
tryShardMultiGetFromTranslog(request, indexShard, node, l);
241-
} else {
242-
l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause));
243-
}
233+
l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause));
244234
}
245235
});
246236
} else {

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.action.get;
1111

12-
import org.apache.lucene.store.AlreadyClosedException;
1312
import org.elasticsearch.TransportVersions;
1413
import org.elasticsearch.action.ActionListener;
1514
import org.elasticsearch.action.ActionRequest;
@@ -97,11 +96,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
9796
}
9897
long segmentGeneration = -1;
9998
if (someItemsNotFoundInTranslog) {
100-
Engine engine = indexShard.getEngineOrNull();
101-
if (engine == null) {
102-
throw new AlreadyClosedException("engine closed");
103-
}
104-
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
99+
segmentGeneration = indexShard.withEngine(Engine::getLastUnsafeSegmentGenerationForGets);
105100
}
106101
return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration);
107102
});

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,9 +1296,9 @@ private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly, Function
12961296
throw new IllegalStateException("get operations not allowed on a legacy index");
12971297
}
12981298
if (translogOnly) {
1299-
return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
1299+
return withEngine(engine -> engine.getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper));
13001300
}
1301-
return getEngine().get(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
1301+
return withEngine(engine -> engine.get(get, mappingLookup, mapperService.documentParser(), searcherWrapper));
13021302
}
13031303

13041304
/**
@@ -4698,9 +4698,6 @@ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGenera
46984698
* @param listener the listener to be notified when the shard is mutable
46994699
*/
47004700
public void ensureMutable(ActionListener<Void> listener) {
4701-
indexEventListener.beforeIndexShardMutableOperation(this, listener.delegateFailure((l, unused) -> {
4702-
// TODO ES-10826: Acquire ref to engine and retry if it's immutable again?
4703-
l.onResponse(null);
4704-
}));
4701+
indexEventListener.beforeIndexShardMutableOperation(this, listener.delegateFailure((l, unused) -> l.onResponse(null)));
47054702
}
47064703
}

0 commit comments

Comments
 (0)