Skip to content

Commit dfa7aeb

Browse files
committed
Assert engine not got by trasnport worker threads
Relates ES-12215
1 parent 7f57c4a commit dfa7aeb

File tree

2 files changed

+27
-17
lines changed

2 files changed

+27
-17
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1458,11 +1458,12 @@ public StoreStats storeStats() {
14581458
}
14591459

14601460
public MergeStats mergeStats() {
1461-
final Engine engine = getEngineOrNull();
1462-
if (engine == null) {
1463-
return new MergeStats();
1464-
}
1465-
return engine.getMergeStats();
1461+
return tryWithEngineOrNull(engine -> {
1462+
if (engine == null) {
1463+
return new MergeStats();
1464+
}
1465+
return engine.getMergeStats();
1466+
});
14661467
}
14671468

14681469
public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) {
@@ -2664,7 +2665,7 @@ boolean shouldRollTranslogGeneration() {
26642665
public void onSettingsChanged() {
26652666
engineResetLock.readLock().lock();
26662667
try {
2667-
var engine = getCurrentEngine(true);
2668+
var engine = getCurrentEngine(true, false);
26682669
if (engine != null) {
26692670
engine.onSettingsChanged();
26702671
}
@@ -3379,7 +3380,7 @@ private void doCheckIndex() throws IOException {
33793380
Engine getEngine() {
33803381
engineResetLock.readLock().lock();
33813382
try {
3382-
return getCurrentEngine(false);
3383+
return getCurrentEngine(false, false);
33833384
} finally {
33843385
engineResetLock.readLock().unlock();
33853386
}
@@ -3395,13 +3396,17 @@ Engine getEngine() {
33953396
public Engine getEngineOrNull() {
33963397
engineResetLock.readLock().lock();
33973398
try {
3398-
return getCurrentEngine(true);
3399+
return getCurrentEngine(true, false);
33993400
} finally {
34003401
engineResetLock.readLock().unlock();
34013402
}
34023403
}
34033404

3404-
private Engine getCurrentEngine(boolean allowNoEngine) {
3405+
private Engine getCurrentEngine(boolean allowNoEngine, boolean skipAssertions) {
3406+
assert skipAssertions
3407+
|| shardRouting.started() == false // exclude getting the engine when not started. We only reset started engines.
3408+
|| state() != IndexShardState.STARTED // exclude getting the engine when not started. We only reset started engines.
3409+
|| assertCurrentThreadWithEngine("method IndexShard#getCurrentEngine (or one of its variant) can block");
34053410
assert engineResetLock.isReadLockedByCurrentThread() || engineResetLock.isWriteLockedByCurrentThread() /* for resets */;
34063411
var engine = currentEngine.get();
34073412
if (engine == null && allowNoEngine == false) {
@@ -3468,12 +3473,12 @@ public <R> R withEngine(Function<Engine, R> operation) {
34683473
* @throws AlreadyClosedException if the current engine instance is {@code null}.
34693474
*/
34703475
public <R, E extends Exception> R withEngineException(CheckedFunction<Engine, R, E> operation) throws E {
3471-
assert assertCurrentThreadWithEngine();
3476+
assert assertCurrentThreadWithEngine("method IndexShard#withEngineException (or one of its variant) can block");
34723477
assert operation != null;
34733478

34743479
engineResetLock.readLock().lock();
34753480
try {
3476-
var engine = getCurrentEngine(false);
3481+
var engine = getCurrentEngine(false, false);
34773482
return operation.apply(engine);
34783483
} finally {
34793484
engineResetLock.readLock().unlock();
@@ -3505,12 +3510,12 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, b
35053510
if (noEngineDuringReset) {
35063511
locked = engineResetLock.readLock().tryLock();
35073512
} else {
3508-
assert assertCurrentThreadWithEngine();
3513+
assert assertCurrentThreadWithEngine("method IndexShard#withEngine (or one of its variant) can block");
35093514
engineResetLock.readLock().lock();
35103515
}
35113516
if (locked) {
35123517
try {
3513-
var engine = getCurrentEngine(allowNoEngine);
3518+
var engine = getCurrentEngine(allowNoEngine, noEngineDuringReset);
35143519
return operation.apply(engine);
35153520
} finally {
35163521
engineResetLock.readLock().unlock();
@@ -3520,8 +3525,7 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, b
35203525
}
35213526
}
35223527

3523-
private static boolean assertCurrentThreadWithEngine() {
3524-
var message = "method IndexShard#withEngine (or one of its variant) can block";
3528+
private static boolean assertCurrentThreadWithEngine(String message) {
35253529
assert ClusterApplierService.assertNotClusterStateUpdateThread(message);
35263530
assert MasterService.assertNotMasterUpdateThread(message);
35273531
assert Transports.assertNotTransportThread(message);
@@ -4551,7 +4555,7 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
45514555
try {
45524556
engineResetLock.writeLock().lock();
45534557
try {
4554-
var engine = getCurrentEngine(false);
4558+
var engine = getCurrentEngine(false, true);
45554559
engine.prepareForEngineReset();
45564560
var newEngine = createEngine(newEngineConfig(replicationTracker));
45574561
getAndSetCurrentEngine(newEngine);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.ccr.action;
88

9+
import org.apache.lucene.util.SetOnce;
910
import org.elasticsearch.ResourceNotFoundException;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.ActionRequestValidationException;
@@ -403,7 +404,12 @@ protected void asyncShardOperation(final Request request, final ShardId shardId,
403404
throws IOException {
404405
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
405406
final IndexShard indexShard = indexService.getShard(request.getShard().id());
406-
final SeqNoStats seqNoStats = indexShard.seqNoStats();
407+
final SetOnce<SeqNoStats> seqNoStatsSetOnce = new SetOnce<>();
408+
threadPool.generic().execute(() -> { seqNoStatsSetOnce.set(indexShard.seqNoStats()); });
409+
while (seqNoStatsSetOnce.get() == null) {
410+
;
411+
}
412+
final SeqNoStats seqNoStats = seqNoStatsSetOnce.get();
407413

408414
if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) {
409415
logger.trace(

0 commit comments

Comments
 (0)