Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
Expand Down Expand Up @@ -235,10 +236,11 @@ private void getFromTranslog(
final var retryingListener = listener.delegateResponse((l, e) -> {
final var cause = ExceptionsHelper.unwrapCause(e);
logger.debug("get_from_translog failed", cause);
// All of the following exceptions can be thrown if the shard is relocated
if (cause instanceof ShardNotFoundException
|| cause instanceof IndexNotFoundException
|| cause instanceof IllegalIndexShardStateException
|| cause instanceof AlreadyClosedException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This AlreadyClosedException is thrown when the engine is closed due to relocation (not due to reset)

// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
logger.debug("retrying get_from_translog");
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
Expand All @@ -253,13 +255,7 @@ public void onClusterServiceClose() {

@Override
public void onTimeout(TimeValue timeout) {
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
if (cause instanceof AlreadyClosedException) {
// Do an additional retry just in case AlreadyClosedException didn't generate a cluster update
tryGetFromTranslog(request, indexShard, node, l);
} else {
l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause));
}
l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause));
}
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.action.get;

import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -65,25 +64,24 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry();
assert getRequest.realtime();
ActionListener.completeWith(listener, () -> {
var result = indexShard.getService()
.getFromTranslog(
getRequest.id(),
getRequest.storedFields(),
getRequest.realtime(),
getRequest.version(),
getRequest.versionType(),
getRequest.fetchSourceContext(),
getRequest.isForceSyntheticSource()
);
long segmentGeneration = -1;
if (result == null) {
Engine engine = indexShard.getEngineOrNull();
if (engine == null) {
throw new AlreadyClosedException("engine closed");
// Allows to keep the same engine instance for getFromTranslog and getLastUnsafeSegmentGenerationForGets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly required, but I think it makes sense to ensure it uses the same engine instance.

return indexShard.withEngineException(engine -> {
var result = indexShard.getService()
.getFromTranslog(
getRequest.id(),
getRequest.storedFields(),
getRequest.realtime(),
getRequest.version(),
getRequest.versionType(),
getRequest.fetchSourceContext(),
getRequest.isForceSyntheticSource()
);
long segmentGeneration = -1;
if (result == null) {
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
}
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
}
return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration);
return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration);
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MultiEngineGet;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -216,10 +217,11 @@ private void shardMultiGetFromTranslog(
final var retryingListener = listener.delegateResponse((l, e) -> {
final var cause = ExceptionsHelper.unwrapCause(e);
logger.debug("mget_from_translog[shard] failed", cause);
// All of the following exceptions can be thrown if the shard is relocated
if (cause instanceof ShardNotFoundException
|| cause instanceof IndexNotFoundException
|| cause instanceof IllegalIndexShardStateException
|| cause instanceof AlreadyClosedException) {
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
logger.debug("retrying mget_from_translog[shard]");
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
Expand All @@ -234,13 +236,7 @@ public void onClusterServiceClose() {

@Override
public void onTimeout(TimeValue timeout) {
// TODO AlreadyClosedException the engine reset should be fixed by ES-10826
if (cause instanceof AlreadyClosedException) {
// Do an additional retry just in case AlreadyClosedException didn't generate a cluster update
tryShardMultiGetFromTranslog(request, indexShard, node, l);
} else {
l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause));
}
l.onFailure(new ElasticsearchException("Timed out retrying mget_from_translog[shard]", cause));
}
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.action.get;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
Expand Down Expand Up @@ -62,48 +61,48 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry();
assert multiGetShardRequest.realtime();
ActionListener.completeWith(listener, () -> {
var multiGetShardResponse = new MultiGetShardResponse();
var someItemsNotFoundInTranslog = false;
for (int i = 0; i < multiGetShardRequest.locations.size(); i++) {
var item = multiGetShardRequest.items.get(i);
try {
var result = indexShard.getService()
.getFromTranslog(
item.id(),
item.storedFields(),
multiGetShardRequest.realtime(),
item.version(),
item.versionType(),
item.fetchSourceContext(),
multiGetShardRequest.isForceSyntheticSource()
// Allows to keep the same engine instance for getFromTranslog and getLastUnsafeSegmentGenerationForGets
return indexShard.withEngineException(engine -> {
var multiGetShardResponse = new MultiGetShardResponse();
var someItemsNotFoundInTranslog = false;

for (int i = 0; i < multiGetShardRequest.locations.size(); i++) {
var item = multiGetShardRequest.items.get(i);
try {
var result = indexShard.getService()
.getFromTranslog(
item.id(),
item.storedFields(),
multiGetShardRequest.realtime(),
item.version(),
item.versionType(),
item.fetchSourceContext(),
multiGetShardRequest.isForceSyntheticSource()
);
GetResponse getResponse = null;
if (result == null) {
someItemsNotFoundInTranslog = true;
} else {
getResponse = new GetResponse(result);
}
multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse);
} catch (RuntimeException | IOException e) {
if (TransportActions.isShardNotAvailableException(e)) {
throw e;
}
logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e);
multiGetShardResponse.add(
multiGetShardRequest.locations.get(i),
new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e)
);
GetResponse getResponse = null;
if (result == null) {
someItemsNotFoundInTranslog = true;
} else {
getResponse = new GetResponse(result);
}
multiGetShardResponse.add(multiGetShardRequest.locations.get(i), getResponse);
} catch (RuntimeException | IOException e) {
if (TransportActions.isShardNotAvailableException(e)) {
throw e;
}
logger.debug("failed to execute multi_get_from_translog for {}[id={}]: {}", shardId, item.id(), e);
multiGetShardResponse.add(
multiGetShardRequest.locations.get(i),
new MultiGetResponse.Failure(multiGetShardRequest.index(), item.id(), e)
);
}
}
long segmentGeneration = -1;
if (someItemsNotFoundInTranslog) {
Engine engine = indexShard.getEngineOrNull();
if (engine == null) {
throw new AlreadyClosedException("engine closed");
long segmentGeneration = -1;
if (someItemsNotFoundInTranslog) {
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
}
segmentGeneration = engine.getLastUnsafeSegmentGenerationForGets();
}
return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration);
return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration);
});
});
}

Expand Down
50 changes: 41 additions & 9 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1310,9 +1310,9 @@ private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly, Function
throw new IllegalStateException("get operations not allowed on a legacy index");
}
if (translogOnly) {
return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
return withEngine(engine -> engine.getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper));
}
return getEngine().get(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
return withEngine(engine -> engine.get(get, mappingLookup, mapperService.documentParser(), searcherWrapper));
}

/**
Expand Down Expand Up @@ -3399,6 +3399,31 @@ public <R> R withEngine(Function<Engine, R> operation) {
return withEngine(operation, false);
}

/**
* Executes an operation (potentially throwing a checked exception) while preventing the shard's engine instance to be reset during the
* execution.
* If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The
* engine might be closed while the operation is executed.
*
* @param operation the operation to execute
* @return the result of the operation
* @param <R> the type of the result
* @param <E> the type of checked exception that the operation can potentially throws.
* @throws AlreadyClosedException if the current engine instance is {@code null}.
*/
public <R, E extends Exception> R withEngineException(CheckedFunction<Engine, R, E> operation) throws E {
assert assertCurrentThreadWithEngine();
assert operation != null;

engineResetLock.readLock().lock();
try {
var engine = getCurrentEngine(false);
return operation.apply(engine);
} finally {
engineResetLock.readLock().unlock();
}
}

/**
* Executes an operation while preventing the shard's engine instance to be reset during the execution
* (see {@link #resetEngine(Consumer<Engine>)}.
Expand All @@ -3413,9 +3438,7 @@ public <R> R withEngine(Function<Engine, R> operation) {
* @param <R> the type of the result
*/
private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine) {
assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block");
assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block");
assert Transports.assertNotTransportThread("IndexShard.withEngine() can block");
assert assertCurrentThreadWithEngine();
assert operation != null;

engineResetLock.readLock().lock();
Expand All @@ -3427,6 +3450,14 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine) {
}
}

private static boolean assertCurrentThreadWithEngine() {
var message = "method IndexShard#withEngine (or one of its variant) can block";
assert ClusterApplierService.assertNotClusterStateUpdateThread(message);
assert MasterService.assertNotMasterUpdateThread(message);
assert Transports.assertNotTransportThread(message);
return true;
}

public void startRecovery(
RecoveryState recoveryState,
PeerRecoveryTargetService recoveryTargetService,
Expand Down Expand Up @@ -4664,10 +4695,11 @@ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGenera
* @param listener the listener to be notified when the shard is mutable
*/
public void ensureMutable(ActionListener<Void> listener, boolean permitAcquired) {
indexEventListener.beforeIndexShardMutableOperation(this, permitAcquired, listener.delegateFailure((l, unused) -> {
// TODO ES-10826: Acquire ref to engine and retry if it's immutable again?
l.onResponse(null);
}));
indexEventListener.beforeIndexShardMutableOperation(
this,
permitAcquired,
listener.delegateFailure((l, unused) -> l.onResponse(null))
);
}

// package-private for tests
Expand Down