Skip to content

Commit 630af90

Browse files
committed
Assert engine not got by trasnport worker threads
Relates ES-12215
1 parent 2331e6b commit 630af90

File tree

2 files changed

+68
-21
lines changed

2 files changed

+68
-21
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1463,11 +1463,12 @@ public StoreStats storeStats() {
14631463
}
14641464

14651465
public MergeStats mergeStats() {
1466-
final Engine engine = getEngineOrNull();
1467-
if (engine == null) {
1468-
return new MergeStats();
1469-
}
1470-
return engine.getMergeStats();
1466+
return tryWithEngineOrNull(engine -> {
1467+
if (engine == null) {
1468+
return new MergeStats();
1469+
}
1470+
return engine.getMergeStats();
1471+
});
14711472
}
14721473

14731474
public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) {
@@ -1715,14 +1716,14 @@ public Engine.SearcherSupplier acquireSearcherSupplier() {
17151716
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) {
17161717
readAllowed();
17171718
markSearcherAccessed();
1718-
final Engine engine = getEngine();
1719+
final Engine engine = getEngine(true); // should primarily happen on search nodes
17191720
return engine.acquireSearcherSupplier(this::wrapSearcher, scope);
17201721
}
17211722

17221723
public Engine.Searcher acquireSearcher(String source) {
17231724
readAllowed();
17241725
markSearcherAccessed();
1725-
final Engine engine = getEngine();
1726+
final Engine engine = getEngine(true); // should primarily happen on search nodes
17261727
return engine.acquireSearcher(source, Engine.SearcherScope.EXTERNAL, this::wrapSearcher);
17271728
}
17281729

@@ -2669,7 +2670,33 @@ boolean shouldRollTranslogGeneration() {
26692670
public void onSettingsChanged() {
26702671
engineResetLock.readLock().lock();
26712672
try {
2672-
var engine = getCurrentEngine(true);
2673+
// TODO this may be called by a cluster state update thread and we need to consider whether it is an issue
2674+
// java.lang.AssertionError: Expected current thread
2675+
// [Thread[#97,elasticsearch[node_t3][clusterApplierService#updateTask][T#1],5,TGRP-StatelessIT]] to not be the cluster state
2676+
// update thread. Reason: [method IndexShard#getCurrentEngine (or one of its variant) can block]
2677+
// at __randomizedtesting.SeedInfo.seed([6244501B70969C37]:0)
2678+
// at org.elasticsearch.cluster.service.ClusterApplierService.assertNotClusterStateUpdateThread(ClusterApplierService.java:386)
2679+
// at org.elasticsearch.index.shard.IndexShard.assertCurrentThreadWithEngine(IndexShard.java:3550)
2680+
// at org.elasticsearch.index.shard.IndexShard.getCurrentEngine(IndexShard.java:3430)
2681+
// at org.elasticsearch.index.shard.IndexShard.onSettingsChanged(IndexShard.java:2673)
2682+
// at org.elasticsearch.index.IndexService.updateMetadata(IndexService.java:1013)
2683+
// at org.elasticsearch.indices.cluster.IndicesClusterStateService.updateIndices(IndicesClusterStateService.java:662)
2684+
// at org.elasticsearch.indices.cluster.IndicesClusterStateService.doApplyClusterState(IndicesClusterStateService.java:322)
2685+
// at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:278)
2686+
// at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:572)
2687+
// at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:558)
2688+
// at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:531)
2689+
// at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:460)
2690+
// at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:159)
2691+
// at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:1000)
2692+
// at
2693+
// org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:218)
2694+
// at
2695+
// org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:184)
2696+
// at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1095)
2697+
// at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:619)
2698+
// at java.base/java.lang.Thread.run(Thread.java:1447)
2699+
var engine = getCurrentEngine(true, true);
26732700
if (engine != null) {
26742701
engine.onSettingsChanged();
26752702
}
@@ -3053,7 +3080,11 @@ public void markAllocationIdAsInSync(final String allocationId, final long local
30533080
* @return the local checkpoint
30543081
*/
30553082
public long getLocalCheckpoint() {
3056-
return getEngine().getPersistedLocalCheckpoint();
3083+
return getLocalCheckpoint(false);
3084+
}
3085+
3086+
public long getLocalCheckpoint(boolean skipAssertions) {
3087+
return getEngine(skipAssertions).getPersistedLocalCheckpoint();
30573088
}
30583089

30593090
/**
@@ -3158,7 +3189,7 @@ public PendingReplicationActions getPendingReplicationActions() {
31583189
*/
31593190
public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) {
31603191
assert assertReplicationTarget();
3161-
final long localCheckpoint = getLocalCheckpoint();
3192+
final long localCheckpoint = getLocalCheckpoint(true);
31623193
if (globalCheckpoint > localCheckpoint) {
31633194
/*
31643195
* This can happen during recovery when the shard has started its engine but recovery is not finalized and is receiving global
@@ -3398,9 +3429,14 @@ private void doCheckIndex() throws IOException {
33983429
*/
33993430
@Deprecated
34003431
Engine getEngine() {
3432+
return getEngine(false);
3433+
}
3434+
3435+
@Deprecated
3436+
Engine getEngine(boolean skipAssertions) {
34013437
engineResetLock.readLock().lock();
34023438
try {
3403-
return getCurrentEngine(false);
3439+
return getCurrentEngine(false, skipAssertions);
34043440
} finally {
34053441
engineResetLock.readLock().unlock();
34063442
}
@@ -3416,13 +3452,19 @@ Engine getEngine() {
34163452
public Engine getEngineOrNull() {
34173453
engineResetLock.readLock().lock();
34183454
try {
3419-
return getCurrentEngine(true);
3455+
return getCurrentEngine(true, false);
34203456
} finally {
34213457
engineResetLock.readLock().unlock();
34223458
}
34233459
}
34243460

3425-
private Engine getCurrentEngine(boolean allowNoEngine) {
3461+
private Engine getCurrentEngine(boolean allowNoEngine, boolean skipAssertions) {
3462+
// We only reset a shard when it's relocating (primary relocation) or started (unhollowing)
3463+
boolean shardRoutingEngineResettable = shardRouting.started() || shardRouting.relocating();
3464+
assert skipAssertions
3465+
|| shardRoutingEngineResettable == false
3466+
|| state() != IndexShardState.STARTED // exclude getting the engine when not started. We only reset started shards.
3467+
|| assertCurrentThreadWithEngine("method IndexShard#getCurrentEngine (or one of its variant) can block");
34263468
assert engineResetLock.isReadLockedByCurrentThread() || engineResetLock.isWriteLockedByCurrentThread() /* for resets */;
34273469
var engine = currentEngine.get();
34283470
if (engine == null && allowNoEngine == false) {
@@ -3489,12 +3531,12 @@ public <R> R withEngine(Function<Engine, R> operation) {
34893531
* @throws AlreadyClosedException if the current engine instance is {@code null}.
34903532
*/
34913533
public <R, E extends Exception> R withEngineException(CheckedFunction<Engine, R, E> operation) throws E {
3492-
assert assertCurrentThreadWithEngine();
3534+
assert assertCurrentThreadWithEngine("method IndexShard#withEngineException (or one of its variant) can block");
34933535
assert operation != null;
34943536

34953537
engineResetLock.readLock().lock();
34963538
try {
3497-
var engine = getCurrentEngine(false);
3539+
var engine = getCurrentEngine(false, false);
34983540
return operation.apply(engine);
34993541
} finally {
35003542
engineResetLock.readLock().unlock();
@@ -3526,12 +3568,12 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, b
35263568
if (noEngineDuringReset) {
35273569
locked = engineResetLock.readLock().tryLock();
35283570
} else {
3529-
assert assertCurrentThreadWithEngine();
3571+
assert assertCurrentThreadWithEngine("method IndexShard#withEngine (or one of its variant) can block");
35303572
engineResetLock.readLock().lock();
35313573
}
35323574
if (locked) {
35333575
try {
3534-
var engine = getCurrentEngine(allowNoEngine);
3576+
var engine = getCurrentEngine(allowNoEngine, noEngineDuringReset);
35353577
return operation.apply(engine);
35363578
} finally {
35373579
engineResetLock.readLock().unlock();
@@ -3541,8 +3583,7 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, b
35413583
}
35423584
}
35433585

3544-
private static boolean assertCurrentThreadWithEngine() {
3545-
var message = "method IndexShard#withEngine (or one of its variant) can block";
3586+
private static boolean assertCurrentThreadWithEngine(String message) {
35463587
assert ClusterApplierService.assertNotClusterStateUpdateThread(message);
35473588
assert MasterService.assertNotMasterUpdateThread(message);
35483589
assert Transports.assertNotTransportThread(message);
@@ -4605,7 +4646,7 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
46054646
try {
46064647
engineResetLock.writeLock().lock();
46074648
try {
4608-
var engine = getCurrentEngine(false);
4649+
var engine = getCurrentEngine(false, true);
46094650
engine.prepareForEngineReset();
46104651
var newEngine = createEngine(newEngineConfig(replicationTracker));
46114652
getAndSetCurrentEngine(newEngine);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.ccr.action;
88

9+
import org.apache.lucene.util.SetOnce;
910
import org.elasticsearch.ResourceNotFoundException;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.ActionRequestValidationException;
@@ -403,7 +404,12 @@ protected void asyncShardOperation(final Request request, final ShardId shardId,
403404
throws IOException {
404405
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
405406
final IndexShard indexShard = indexService.getShard(request.getShard().id());
406-
final SeqNoStats seqNoStats = indexShard.seqNoStats();
407+
final SetOnce<SeqNoStats> seqNoStatsSetOnce = new SetOnce<>();
408+
threadPool.generic().execute(() -> { seqNoStatsSetOnce.set(indexShard.seqNoStats()); });
409+
while (seqNoStatsSetOnce.get() == null) {
410+
;
411+
}
412+
final SeqNoStats seqNoStats = seqNoStatsSetOnce.get();
407413

408414
if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) {
409415
logger.trace(

0 commit comments

Comments
 (0)