Skip to content

Commit 7ad2369

Browse files
fcofdeztlrx
andauthored
Guard Get operations against Engine resets (#125646)
Closes ES-11324 --------- Co-authored-by: tlrx <[email protected]>
1 parent 31995ac commit 7ad2369

File tree

5 files changed

+104
-83
lines changed

5 files changed

+104
-83
lines changed

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.index.IndexService;
4141
import org.elasticsearch.index.engine.Engine;
4242
import org.elasticsearch.index.get.GetResult;
43+
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
4344
import org.elasticsearch.index.shard.IndexShard;
4445
import org.elasticsearch.index.shard.ShardId;
4546
import org.elasticsearch.index.shard.ShardNotFoundException;
@@ -235,10 +236,11 @@ private void getFromTranslog(
235236
final var retryingListener = listener.delegateResponse((l, e) -> {
236237
final var cause = ExceptionsHelper.unwrapCause(e);
237238
logger.debug("get_from_translog failed", cause);
239+
// All of the following exceptions can be thrown if the shard is relocated
238240
if (cause instanceof ShardNotFoundException
239241
|| cause instanceof IndexNotFoundException
242+
|| cause instanceof IllegalIndexShardStateException
240243
|| cause instanceof AlreadyClosedException) {
241-
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
242244
logger.debug("retrying get_from_translog");
243245
observer.waitForNextChange(new ClusterStateObserver.Listener() {
244246
@Override
@@ -253,13 +255,7 @@ public void onClusterServiceClose() {
253255

254256
@Override
255257
public void onTimeout(TimeValue timeout) {
256-
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
257-
if (cause instanceof AlreadyClosedException) {
258-
// Do an additional retry just in case AlreadyClosedException didn't generate a cluster update
259-
tryGetFromTranslog(request, indexShard, node, l);
260-
} else {
261-
l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause));
262-
}
258+
l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause));
263259
}
264260
});
265261
} else {

server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.action.get;
1111

12-
import org.apache.lucene.store.AlreadyClosedException;
1312
import org.apache.lucene.util.BytesRef;
1413
import org.elasticsearch.TransportVersions;
1514
import org.elasticsearch.action.ActionListener;
@@ -65,25 +64,24 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
6564
assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry();
6665
assert getRequest.realtime();
6766
ActionListener.completeWith(listener, () -> {
68-
var result = indexShard.getService()
69-
.getFromTranslog(
70-
getRequest.id(),
71-
getRequest.storedFields(),
72-
getRequest.realtime(),
73-
getRequest.version(),
74-
getRequest.versionType(),
75-
getRequest.fetchSourceContext(),
76-
getRequest.isForceSyntheticSource()
77-
);
78-
long segmentGeneration = -1;
79-
if (result == null) {
80-
Engine engine = indexShard.getEngineOrNull();
81-
if (engine == null) {
82-
throw new AlreadyClosedException("engine closed");
67+
// Allows to keep the same engine instance for getFromTranslog and getLastUnsafeSegmentGenerationForGets
68+
return indexShard.withEngineException(engine -> {
69+
var result = indexShard.getService()
70+
.getFromTranslog(
71+
getRequest.id(),
72+
getRequest.storedFields(),
73+
getRequest.realtime(),
74+
getRequest.version(),
75+
getRequest.versionType(),
76+
getRequest.fetchSourceContext(),
77+
getRequest.isForceSyntheticSource()
78+
);
79+
long segmentGeneration = -1;
80+
if (result == null) {
81+
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
8382
}
84-
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
85-
}
86-
return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration);
83+
return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration);
84+
});
8785
});
8886
}
8987

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.index.IndexService;
4141
import org.elasticsearch.index.engine.Engine;
4242
import org.elasticsearch.index.get.GetResult;
43+
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
4344
import org.elasticsearch.index.shard.IndexShard;
4445
import org.elasticsearch.index.shard.MultiEngineGet;
4546
import org.elasticsearch.index.shard.ShardId;
@@ -216,10 +217,11 @@ private void shardMultiGetFromTranslog(
216217
final var retryingListener = listener.delegateResponse((l, e) -> {
217218
final var cause = ExceptionsHelper.unwrapCause(e);
218219
logger.debug("mget_from_translog[shard] failed", cause);
220+
// All of the following exceptions can be thrown if the shard is relocated
219221
if (cause instanceof ShardNotFoundException
220222
|| cause instanceof IndexNotFoundException
223+
|| cause instanceof IllegalIndexShardStateException
221224
|| cause instanceof AlreadyClosedException) {
222-
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
223225
logger.debug("retrying mget_from_translog[shard]");
224226
observer.waitForNextChange(new ClusterStateObserver.Listener() {
225227
@Override
@@ -234,13 +236,7 @@ public void onClusterServiceClose() {
234236

235237
@Override
236238
public void onTimeout(TimeValue timeout) {
237-
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
238-
if (cause instanceof AlreadyClosedException) {
239-
// Do an additional retry just in case AlreadyClosedException didn't generate a cluster update
240-
tryShardMultiGetFromTranslog(request, indexShard, node, l);
241-
} else {
242-
l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause));
243-
}
239+
l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause));
244240
}
245241
});
246242
} else {

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.action.get;
1111

12-
import org.apache.lucene.store.AlreadyClosedException;
1312
import org.elasticsearch.TransportVersions;
1413
import org.elasticsearch.action.ActionListener;
1514
import org.elasticsearch.action.ActionRequest;
@@ -62,48 +61,48 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
6261
assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry();
6362
assert multiGetShardRequest.realtime();
6463
ActionListener.completeWith(listener, () -> {
65-
var multiGetShardResponse = new MultiGetShardResponse();
66-
var someItemsNotFoundInTranslog = false;
67-
for (int i = 0; i < multiGetShardRequest.locations.size(); i++) {
68-
var item = multiGetShardRequest.items.get(i);
69-
try {
70-
var result = indexShard.getService()
71-
.getFromTranslog(
72-
item.id(),
73-
item.storedFields(),
74-
multiGetShardRequest.realtime(),
75-
item.version(),
76-
item.versionType(),
77-
item.fetchSourceContext(),
78-
multiGetShardRequest.isForceSyntheticSource()
64+
// Allows to keep the same engine instance for getFromTranslog and getLastUnsafeSegmentGenerationForGets
65+
return indexShard.withEngineException(engine -> {
66+
var multiGetShardResponse = new MultiGetShardResponse();
67+
var someItemsNotFoundInTranslog = false;
68+
69+
for (int i = 0; i < multiGetShardRequest.locations.size(); i++) {
70+
var item = multiGetShardRequest.items.get(i);
71+
try {
72+
var result = indexShard.getService()
73+
.getFromTranslog(
74+
item.id(),
75+
item.storedFields(),
76+
multiGetShardRequest.realtime(),
77+
item.version(),
78+
item.versionType(),
79+
item.fetchSourceContext(),
80+
multiGetShardRequest.isForceSyntheticSource()
81+
);
82+
GetResponse getResponse = null;
83+
if (result == null) {
84+
someItemsNotFoundInTranslog = true;
85+
} else {
86+
getResponse = new GetResponse(result);
87+
}
88+
multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse);
89+
} catch (RuntimeException | IOException e) {
90+
if (TransportActions.isShardNotAvailableException(e)) {
91+
throw e;
92+
}
93+
logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e);
94+
multiGetShardResponse.add(
95+
multiGetShardRequest.locations.get(i),
96+
new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e)
7997
);
80-
GetResponse getResponse = null;
81-
if (result == null) {
82-
someItemsNotFoundInTranslog = true;
83-
} else {
84-
getResponse = new GetResponse(result);
8598
}
86-
multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse);
87-
} catch (RuntimeException | IOException e) {
88-
if (TransportActions.isShardNotAvailableException(e)) {
89-
throw e;
90-
}
91-
logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e);
92-
multiGetShardResponse.add(
93-
multiGetShardRequest.locations.get(i),
94-
new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e)
95-
);
9699
}
97-
}
98-
long segmentGeneration = -1;
99-
if (someItemsNotFoundInTranslog) {
100-
Engine engine = indexShard.getEngineOrNull();
101-
if (engine == null) {
102-
throw new AlreadyClosedException("engine closed");
100+
long segmentGeneration = -1;
101+
if (someItemsNotFoundInTranslog) {
102+
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
103103
}
104-
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
105-
}
106-
return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration);
104+
return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration);
105+
});
107106
});
108107
}
109108

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,9 +1310,9 @@ private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly, Function
13101310
throw new IllegalStateException("get operations not allowed on a legacy index");
13111311
}
13121312
if (translogOnly) {
1313-
return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
1313+
return withEngine(engine -> engine.getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper));
13141314
}
1315-
return getEngine().get(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
1315+
return withEngine(engine -> engine.get(get, mappingLookup, mapperService.documentParser(), searcherWrapper));
13161316
}
13171317

13181318
/**
@@ -3399,6 +3399,31 @@ public <R> R withEngine(Function<Engine, R> operation) {
33993399
return withEngine(operation, false);
34003400
}
34013401

3402+
/**
3403+
* Executes an operation (potentially throwing a checked exception) while preventing the shard's engine instance to be reset during the
3404+
* execution.
3405+
* If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The
3406+
* engine might be closed while the operation is executed.
3407+
*
3408+
* @param operation the operation to execute
3409+
* @return the result of the operation
3410+
* @param <R> the type of the result
3411+
* @param <E> the type of checked exception that the operation can potentially throws.
3412+
* @throws AlreadyClosedException if the current engine instance is {@code null}.
3413+
*/
3414+
public <R, E extends Exception> R withEngineException(CheckedFunction<Engine, R, E> operation) throws E {
3415+
assert assertCurrentThreadWithEngine();
3416+
assert operation != null;
3417+
3418+
engineResetLock.readLock().lock();
3419+
try {
3420+
var engine = getCurrentEngine(false);
3421+
return operation.apply(engine);
3422+
} finally {
3423+
engineResetLock.readLock().unlock();
3424+
}
3425+
}
3426+
34023427
/**
34033428
* Executes an operation while preventing the shard's engine instance to be reset during the execution
34043429
* (see {@link #resetEngine(Consumer<Engine>)}.
@@ -3413,9 +3438,7 @@ public <R> R withEngine(Function<Engine, R> operation) {
34133438
* @param <R> the type of the result
34143439
*/
34153440
private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine) {
3416-
assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block");
3417-
assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block");
3418-
assert Transports.assertNotTransportThread("IndexShard.withEngine() can block");
3441+
assert assertCurrentThreadWithEngine();
34193442
assert operation != null;
34203443

34213444
engineResetLock.readLock().lock();
@@ -3427,6 +3450,14 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine) {
34273450
}
34283451
}
34293452

3453+
private static boolean assertCurrentThreadWithEngine() {
3454+
var message = "method IndexShard#withEngine (or one of its variant) can block";
3455+
assert ClusterApplierService.assertNotClusterStateUpdateThread(message);
3456+
assert MasterService.assertNotMasterUpdateThread(message);
3457+
assert Transports.assertNotTransportThread(message);
3458+
return true;
3459+
}
3460+
34303461
public void startRecovery(
34313462
RecoveryState recoveryState,
34323463
PeerRecoveryTargetService recoveryTargetService,
@@ -4664,10 +4695,11 @@ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGenera
46644695
* @param listener the listener to be notified when the shard is mutable
46654696
*/
46664697
public void ensureMutable(ActionListener<Void> listener, boolean permitAcquired) {
4667-
indexEventListener.beforeIndexShardMutableOperation(this, permitAcquired, listener.delegateFailure((l, unused) -> {
4668-
// TODO ES-10826: Acquire ref to engine and retry if it's immutable again?
4669-
l.onResponse(null);
4670-
}));
4698+
indexEventListener.beforeIndexShardMutableOperation(
4699+
this,
4700+
permitAcquired,
4701+
listener.delegateFailure((l, unused) -> l.onResponse(null))
4702+
);
46714703
}
46724704

46734705
// package-private for tests

0 commit comments

Comments
 (0)