Skip to content

Commit c097a1b

Browse files
authored
Make IndexShard.resetEngine re-throw on failures (#137262)
Updated IndexShard.resetEngine() to re-throw on failures to bubble up critical issues back to the caller. Change IndexShard.relocated() to take CheckedBiConsumer instead of BiConsumer. Makes it easier to call methods with checked exceptions.
1 parent f130e98 commit c097a1b

File tree

2 files changed

+28
-6
lines changed

2 files changed

+28
-6
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
4848
import org.elasticsearch.cluster.service.ClusterApplierService;
4949
import org.elasticsearch.cluster.service.MasterService;
50+
import org.elasticsearch.common.CheckedBiConsumer;
5051
import org.elasticsearch.common.UUIDs;
5152
import org.elasticsearch.common.io.stream.BytesStreamOutput;
5253
import org.elasticsearch.common.lucene.Lucene;
@@ -794,18 +795,18 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
794795

795796
/**
796797
* Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided
797-
* {@link BiConsumer} is executed after all operations are successfully blocked.
798+
* {@link CheckedBiConsumer} is executed after all operations are successfully blocked.
798799
*
799-
* @param consumer a {@link BiConsumer} that is executed after operations are blocked and that consumes the primary context as well as
800-
* a listener to resolve once it finished
800+
* @param consumer a {@link CheckedBiConsumer} that is executed after operations are blocked and that consumes the primary context as
801+
* well as a listener to resolve once it finished
801802
* @param listener listener to resolve once this method actions including executing {@code consumer} in the non-failure case complete
802803
* @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation
803804
* @throws IllegalStateException if the relocation target is no longer part of the replication group
804805
*/
805806
public void relocated(
806807
final String targetNodeId,
807808
final String targetAllocationId,
808-
final BiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>> consumer,
809+
final CheckedBiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>, Exception> consumer,
809810
final ActionListener<Void> listener
810811
) throws IllegalIndexShardStateException, IllegalStateException {
811812
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
@@ -4638,12 +4639,14 @@ public void afterRefresh(boolean didRefresh) {
46384639
* @param postResetNewEngineConsumer A consumer that will be called with the newly created engine after the reset
46394640
* is complete, allowing for post-reset operations on the new engine instance.
46404641
* The provided engine reference should not be retained by the consumer.
4642+
* @throws Exception if reset could not be completed or previous engine could not be closed
46414643
*/
4642-
public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
4644+
public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) throws Exception {
46434645
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
46444646
assert waitForEngineOrClosedShardListeners.isDone();
46454647
assert assertNoEngineResetLock();
46464648
Engine previousEngine = null;
4649+
Exception primaryException = null;
46474650
try {
46484651
synchronized (engineMutex) {
46494652
verifyNotClosed();
@@ -4667,16 +4670,26 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
46674670
} catch (Exception e) {
46684671
// we want to fail the shard in the case prepareForEngineReset throws
46694672
failShard("unable to reset engine", e);
4673+
throw e;
46704674
}
46714675
}
46724676
onSettingsChanged();
4677+
} catch (Exception e) {
4678+
primaryException = e;
4679+
throw e;
46734680
} finally {
46744681
if (previousEngine != null) {
46754682
assert engineResetLock.isReadLockedByCurrentThread();
46764683
try {
46774684
IOUtils.close(previousEngine);
46784685
} catch (Exception e) {
46794686
failShard("unable to close previous engine after reset", e);
4687+
4688+
if (primaryException != null) {
4689+
primaryException.addSuppressed(e);
4690+
} else {
4691+
throw e;
4692+
}
46804693
} finally {
46814694
engineResetLock.readLock().unlock();
46824695
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.elasticsearch.cluster.routing.ShardRoutingState;
5151
import org.elasticsearch.cluster.routing.TestShardRouting;
5252
import org.elasticsearch.cluster.routing.UnassignedInfo;
53+
import org.elasticsearch.common.CheckedBiConsumer;
5354
import org.elasticsearch.common.Randomness;
5455
import org.elasticsearch.common.Strings;
5556
import org.elasticsearch.common.UUIDs;
@@ -4465,6 +4466,8 @@ public void prepareForEngineReset() throws IOException {
44654466
});
44664467
assertThat(preparedForReset.get(), equalTo(true));
44674468
l.onResponse(null);
4469+
} catch (Exception e) {
4470+
l.onFailure(e);
44684471
}
44694472
}), EsExecutors.DIRECT_EXECUTOR_SERVICE);
44704473
} catch (Exception e) {
@@ -5308,6 +5311,8 @@ public void prepareForEngineReset() throws IOException {
53085311
});
53095312
assertThat(preparedForReset.get(), equalTo(true));
53105313
l.onResponse(null);
5314+
} catch (Exception e) {
5315+
l.onFailure(e);
53115316
}
53125317
}), EsExecutors.DIRECT_EXECUTOR_SERVICE);
53135318
} catch (Exception e) {
@@ -5359,6 +5364,8 @@ public void prepareForEngineReset() throws IOException {
53595364
});
53605365
assertThat(preparedForReset.get(), equalTo(true));
53615366
l.onResponse(null);
5367+
} catch (Exception e) {
5368+
l.onFailure(e);
53625369
}
53635370
}), EsExecutors.DIRECT_EXECUTOR_SERVICE);
53645371
} catch (Exception e) {
@@ -5564,6 +5571,8 @@ public void prepareForEngineReset() throws IOException {
55645571
});
55655572
assertThat(preparedForReset.get(), equalTo(true));
55665573
l.onResponse(null);
5574+
} catch (Exception e) {
5575+
l.onFailure(e);
55675576
}
55685577
}), EsExecutors.DIRECT_EXECUTOR_SERVICE);
55695578
} catch (Exception e) {
@@ -5852,7 +5861,7 @@ public void advance(int ticks) {
58525861
private static void blockingCallRelocated(
58535862
IndexShard indexShard,
58545863
ShardRouting routing,
5855-
BiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>> consumer
5864+
CheckedBiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>, Exception> consumer
58565865
) {
58575866
safeAwait(
58585867
(ActionListener<Void> listener) -> indexShard.relocated(

0 commit comments

Comments
 (0)