Skip to content

Commit 6e1255f

Browse files
committed
update + few changes
1 parent ce9b0c3 commit 6e1255f

File tree

5 files changed

+105
-55
lines changed

5 files changed

+105
-55
lines changed

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.apache.lucene.store.AlreadyClosedException;
1415
import org.elasticsearch.ElasticsearchException;
1516
import org.elasticsearch.ExceptionsHelper;
1617
import org.elasticsearch.action.ActionListener;
@@ -39,6 +40,7 @@
3940
import org.elasticsearch.index.IndexService;
4041
import org.elasticsearch.index.engine.Engine;
4142
import org.elasticsearch.index.get.GetResult;
43+
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
4244
import org.elasticsearch.index.shard.IndexShard;
4345
import org.elasticsearch.index.shard.ShardId;
4446
import org.elasticsearch.index.shard.ShardNotFoundException;
@@ -234,7 +236,11 @@ private void getFromTranslog(
234236
final var retryingListener = listener.delegateResponse((l, e) -> {
235237
final var cause = ExceptionsHelper.unwrapCause(e);
236238
logger.debug("get_from_translog failed", cause);
237-
if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) {
239+
// All of the following exceptions can be thrown if the shard is relocated
240+
if (cause instanceof ShardNotFoundException
241+
|| cause instanceof IndexNotFoundException
242+
|| cause instanceof IllegalIndexShardStateException
243+
|| cause instanceof AlreadyClosedException) {
238244
logger.debug("retrying get_from_translog");
239245
observer.waitForNextChange(new ClusterStateObserver.Listener() {
240246
@Override

server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,21 +64,24 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
6464
assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry();
6565
assert getRequest.realtime();
6666
ActionListener.completeWith(listener, () -> {
67-
var result = indexShard.getService()
68-
.getFromTranslog(
69-
getRequest.id(),
70-
getRequest.storedFields(),
71-
getRequest.realtime(),
72-
getRequest.version(),
73-
getRequest.versionType(),
74-
getRequest.fetchSourceContext(),
75-
getRequest.isForceSyntheticSource()
76-
);
77-
long segmentGeneration = -1;
78-
if (result == null) {
79-
segmentGeneration = indexShard.withEngine(Engine::getLastUnsafeSegmentGenerationForGets);
80-
}
81-
return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration);
67+
// Allows to keep the same engine instance for getFromTranslog and getLastUnsafeSegmentGenerationForGets
68+
return indexShard.withEngineException(engine -> {
69+
var result = indexShard.getService()
70+
.getFromTranslog(
71+
getRequest.id(),
72+
getRequest.storedFields(),
73+
getRequest.realtime(),
74+
getRequest.version(),
75+
getRequest.versionType(),
76+
getRequest.fetchSourceContext(),
77+
getRequest.isForceSyntheticSource()
78+
);
79+
long segmentGeneration = -1;
80+
if (result == null) {
81+
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
82+
}
83+
return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration);
84+
});
8285
});
8386
}
8487

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.apache.lucene.store.AlreadyClosedException;
1415
import org.elasticsearch.ElasticsearchException;
1516
import org.elasticsearch.ExceptionsHelper;
1617
import org.elasticsearch.action.ActionListener;
@@ -39,6 +40,7 @@
3940
import org.elasticsearch.index.IndexService;
4041
import org.elasticsearch.index.engine.Engine;
4142
import org.elasticsearch.index.get.GetResult;
43+
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
4244
import org.elasticsearch.index.shard.IndexShard;
4345
import org.elasticsearch.index.shard.MultiEngineGet;
4446
import org.elasticsearch.index.shard.ShardId;
@@ -215,7 +217,11 @@ private void shardMultiGetFromTranslog(
215217
final var retryingListener = listener.delegateResponse((l, e) -> {
216218
final var cause = ExceptionsHelper.unwrapCause(e);
217219
logger.debug("mget_from_translog[shard] failed", cause);
218-
if (cause instanceof ShardNotFoundException || cause instanceof IndexNotFoundException) {
220+
// All of the following exceptions can be thrown if the shard is relocated
221+
if (cause instanceof ShardNotFoundException
222+
|| cause instanceof IndexNotFoundException
223+
|| cause instanceof IllegalIndexShardStateException
224+
|| cause instanceof AlreadyClosedException) {
219225
logger.debug("retrying mget_from_translog[shard]");
220226
observer.waitForNextChange(new ClusterStateObserver.Listener() {
221227
@Override

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -61,44 +61,48 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
6161
assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry();
6262
assert multiGetShardRequest.realtime();
6363
ActionListener.completeWith(listener, () -> {
64-
var multiGetShardResponse = new MultiGetShardResponse();
65-
var someItemsNotFoundInTranslog = false;
66-
for (int i = 0; i < multiGetShardRequest.locations.size(); i++) {
67-
var item = multiGetShardRequest.items.get(i);
68-
try {
69-
var result = indexShard.getService()
70-
.getFromTranslog(
71-
item.id(),
72-
item.storedFields(),
73-
multiGetShardRequest.realtime(),
74-
item.version(),
75-
item.versionType(),
76-
item.fetchSourceContext(),
77-
multiGetShardRequest.isForceSyntheticSource()
64+
// Allows to keep the same engine instance for getFromTranslog and getLastUnsafeSegmentGenerationForGets
65+
return indexShard.withEngineException(engine -> {
66+
var multiGetShardResponse = new MultiGetShardResponse();
67+
var someItemsNotFoundInTranslog = false;
68+
69+
for (int i = 0; i < multiGetShardRequest.locations.size(); i++) {
70+
var item = multiGetShardRequest.items.get(i);
71+
try {
72+
var result = indexShard.getService()
73+
.getFromTranslog(
74+
item.id(),
75+
item.storedFields(),
76+
multiGetShardRequest.realtime(),
77+
item.version(),
78+
item.versionType(),
79+
item.fetchSourceContext(),
80+
multiGetShardRequest.isForceSyntheticSource()
81+
);
82+
GetResponse getResponse = null;
83+
if (result == null) {
84+
someItemsNotFoundInTranslog = true;
85+
} else {
86+
getResponse = new GetResponse(result);
87+
}
88+
multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse);
89+
} catch (RuntimeException | IOException e) {
90+
if (TransportActions.isShardNotAvailableException(e)) {
91+
throw e;
92+
}
93+
logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e);
94+
multiGetShardResponse.add(
95+
multiGetShardRequest.locations.get(i),
96+
new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e)
7897
);
79-
GetResponse getResponse = null;
80-
if (result == null) {
81-
someItemsNotFoundInTranslog = true;
82-
} else {
83-
getResponse = new GetResponse(result);
8498
}
85-
multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse);
86-
} catch (RuntimeException | IOException e) {
87-
if (TransportActions.isShardNotAvailableException(e)) {
88-
throw e;
89-
}
90-
logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e);
91-
multiGetShardResponse.add(
92-
multiGetShardRequest.locations.get(i),
93-
new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e)
94-
);
9599
}
96-
}
97-
long segmentGeneration = -1;
98-
if (someItemsNotFoundInTranslog) {
99-
segmentGeneration = indexShard.withEngine(Engine::getLastUnsafeSegmentGenerationForGets);
100-
}
101-
return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration);
100+
long segmentGeneration = -1;
101+
if (someItemsNotFoundInTranslog) {
102+
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
103+
}
104+
return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration);
105+
});
102106
});
103107
}
104108

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3399,6 +3399,31 @@ public <R> R withEngine(Function<Engine, R> operation) {
33993399
return withEngine(operation, false);
34003400
}
34013401

3402+
/**
3403+
* Executes an operation (potentially throwing a checked exception) while preventing the shard's engine instance to be reset during the
3404+
* execution.
3405+
* If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The
3406+
* engine might be closed while the operation is executed.
3407+
*
3408+
* @param operation the operation to execute
3409+
* @return the result of the operation
3410+
* @param <R> the type of the result
3411+
* @param <E> the type of checked exception that the operation can potentially throws.
3412+
* @throws AlreadyClosedException if the current engine instance is {@code null}.
3413+
*/
3414+
public <R, E extends Exception> R withEngineException(CheckedFunction<Engine, R, E> operation) throws E {
3415+
assert assertCurrentThreadWithEngine();
3416+
assert operation != null;
3417+
3418+
engineResetLock.readLock().lock();
3419+
try {
3420+
var engine = getCurrentEngine(false);
3421+
return operation.apply(engine);
3422+
} finally {
3423+
engineResetLock.readLock().unlock();
3424+
}
3425+
}
3426+
34023427
/**
34033428
* Executes an operation while preventing the shard's engine instance to be reset during the execution
34043429
* (see {@link #resetEngine(Consumer<Engine>)}.
@@ -3413,9 +3438,7 @@ public <R> R withEngine(Function<Engine, R> operation) {
34133438
* @param <R> the type of the result
34143439
*/
34153440
private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine) {
3416-
assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block");
3417-
assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block");
3418-
assert Transports.assertNotTransportThread("IndexShard.withEngine() can block");
3441+
assert assertCurrentThreadWithEngine();
34193442
assert operation != null;
34203443

34213444
engineResetLock.readLock().lock();
@@ -3427,6 +3450,14 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine) {
34273450
}
34283451
}
34293452

3453+
private static boolean assertCurrentThreadWithEngine() {
3454+
var message = "method IndexShard#withEngine (or one of its variant) can block";
3455+
assert ClusterApplierService.assertNotClusterStateUpdateThread(message);
3456+
assert MasterService.assertNotMasterUpdateThread(message);
3457+
assert Transports.assertNotTransportThread(message);
3458+
return true;
3459+
}
3460+
34303461
public void startRecovery(
34313462
RecoveryState recoveryState,
34323463
PeerRecoveryTargetService recoveryTargetService,

0 commit comments

Comments
 (0)