Skip to content

Commit af7d32c

Browse files
committed
Fork to WRITE thread when failing shard (#84606)
If a replication operation fails then the primary will try and fail the replica. If this operation also fails (i.e. another shard copy has been promoted to primary) then the primary must fail itself. Today it does this on a transport thread. This is a problem because failing the shard needs to acquire a lock which may be held by another operation that is performing some potentially-long-running IO. With this commit we add an assertion that the engine is never failed on a transport thread, and adjust `ReplicationOperation` to fork the call to `failShard` to the `WRITE` threadpool. Without the change to `ReplicationOperation` the assertion is tripped by `testAckedIndexing`. Closes #84602
1 parent 018de1a commit af7d32c

File tree

3 files changed

+52
-15
lines changed

3 files changed

+52
-15
lines changed

docs/changelog/84606.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 84606
2+
summary: Fork to WRITE thread when failing shard
3+
area: Engine
4+
type: bug
5+
issues:
6+
- 84602

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.logging.log4j.Logger;
1111
import org.apache.logging.log4j.message.ParameterizedMessage;
1212
import org.apache.lucene.store.AlreadyClosedException;
13-
import org.elasticsearch.Assertions;
1413
import org.elasticsearch.ElasticsearchException;
1514
import org.elasticsearch.ExceptionsHelper;
1615
import org.elasticsearch.action.ActionListener;
@@ -23,6 +22,7 @@
2322
import org.elasticsearch.cluster.routing.ShardRouting;
2423
import org.elasticsearch.common.breaker.CircuitBreakingException;
2524
import org.elasticsearch.common.io.stream.StreamInput;
25+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2626
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2727
import org.elasticsearch.core.Nullable;
2828
import org.elasticsearch.core.TimeValue;
@@ -317,26 +317,55 @@ private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointS
317317
private void onNoLongerPrimary(Exception failure) {
318318
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
319319
final boolean nodeIsClosing = cause instanceof NodeClosedException;
320-
final String message;
321320
if (nodeIsClosing) {
322-
message = String.format(
323-
Locale.ROOT,
324-
"node with primary [%s] is shutting down while failing replica shard",
325-
primary.routingEntry()
326-
);
327321
// We prefer not to fail the primary to avoid unnecessary warning log
328322
// when the node with the primary shard is gracefully shutting down.
323+
finishAsFailed(
324+
new RetryOnPrimaryException(
325+
primary.routingEntry().shardId(),
326+
String.format(
327+
Locale.ROOT,
328+
"node with primary [%s] is shutting down while failing replica shard",
329+
primary.routingEntry()
330+
),
331+
failure
332+
)
333+
);
329334
} else {
330-
if (Assertions.ENABLED) {
331-
if (failure instanceof ShardStateAction.NoLongerPrimaryShardException == false) {
332-
throw new AssertionError("unexpected failure", failure);
335+
assert failure instanceof ShardStateAction.NoLongerPrimaryShardException : failure;
336+
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
337+
@Override
338+
protected void doRun() {
339+
// we are no longer the primary, fail ourselves and start over
340+
final var message = String.format(
341+
Locale.ROOT,
342+
"primary shard [%s] was demoted while failing replica shard",
343+
primary.routingEntry()
344+
);
345+
primary.failShard(message, failure);
346+
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure));
333347
}
334-
}
335-
// we are no longer the primary, fail ourselves and start over
336-
message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard", primary.routingEntry());
337-
primary.failShard(message, failure);
348+
349+
@Override
350+
public boolean isForceExecution() {
351+
return true;
352+
}
353+
354+
@Override
355+
public void onFailure(Exception e) {
356+
e.addSuppressed(failure);
357+
assert false : e;
358+
logger.error(new ParameterizedMessage("unexpected failure while failing primary [{}]", primary.routingEntry()), e);
359+
finishAsFailed(
360+
new RetryOnPrimaryException(
361+
primary.routingEntry().shardId(),
362+
String.format(Locale.ROOT, "unexpected failure while failing primary [%s]", primary.routingEntry()),
363+
e
364+
)
365+
);
366+
}
367+
});
338368
}
339-
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure));
340369
}
341370

342371
/**

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.elasticsearch.index.translog.Translog;
6262
import org.elasticsearch.index.translog.TranslogStats;
6363
import org.elasticsearch.search.suggest.completion.CompletionStats;
64+
import org.elasticsearch.transport.Transports;
6465

6566
import java.io.Closeable;
6667
import java.io.IOException;
@@ -1099,6 +1100,7 @@ private void maybeDie(final String maybeMessage, final Throwable maybeFatal) {
10991100
* The underlying store is marked corrupted iff failure is caused by index corruption
11001101
*/
11011102
public void failEngine(String reason, @Nullable Exception failure) {
1103+
assert Transports.assertNotTransportThread("failEngine can block on IO");
11021104
if (failure != null) {
11031105
maybeDie(reason, failure);
11041106
}

0 commit comments

Comments
 (0)