Skip to content

Commit bfaab93

Browse files
committed
Assert engine not got by trasnport worker threads
Relates ES-12215
1 parent 334b3ad commit bfaab93

File tree

5 files changed

+101
-28
lines changed

5 files changed

+101
-28
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 79 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,7 +1400,11 @@ public CommitStats commitStats() {
14001400
* @throws AlreadyClosedException if shard is closed
14011401
*/
14021402
public SeqNoStats seqNoStats() {
1403-
return getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
1403+
return seqNoStats(false);
1404+
}
1405+
1406+
public SeqNoStats seqNoStats(boolean skipAssertions) {
1407+
return getEngine(skipAssertions).getSeqNoStats(replicationTracker.getGlobalCheckpoint());
14041408
}
14051409

14061410
public IndexingStats indexingStats() {
@@ -1463,11 +1467,12 @@ public StoreStats storeStats() {
14631467
}
14641468

14651469
public MergeStats mergeStats() {
1466-
final Engine engine = getEngineOrNull();
1467-
if (engine == null) {
1468-
return new MergeStats();
1469-
}
1470-
return engine.getMergeStats();
1470+
return tryWithEngineOrNull(engine -> {
1471+
if (engine == null) {
1472+
return new MergeStats();
1473+
}
1474+
return engine.getMergeStats();
1475+
});
14711476
}
14721477

14731478
public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) {
@@ -1707,14 +1712,14 @@ public Engine.SearcherSupplier acquireSearcherSupplier() {
17071712
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) {
17081713
readAllowed();
17091714
markSearcherAccessed();
1710-
final Engine engine = getEngine();
1715+
final Engine engine = getEngine(true); // should primarily happen on search nodes
17111716
return engine.acquireSearcherSupplier(this::wrapSearcher, scope);
17121717
}
17131718

17141719
public Engine.Searcher acquireSearcher(String source) {
17151720
readAllowed();
17161721
markSearcherAccessed();
1717-
final Engine engine = getEngine();
1722+
final Engine engine = getEngine(true); // should primarily happen on search nodes
17181723
return engine.acquireSearcher(source, Engine.SearcherScope.EXTERNAL, this::wrapSearcher);
17191724
}
17201725

@@ -2661,7 +2666,35 @@ boolean shouldRollTranslogGeneration() {
26612666
public void onSettingsChanged() {
26622667
engineResetLock.readLock().lock();
26632668
try {
2664-
var engine = getCurrentEngine(true);
2669+
// TODO this may be called by a cluster state update thread and we need to consider whether it is an issue
2670+
// java.lang.AssertionError: Expected current thread
2671+
// [Thread[#97,elasticsearch[node_t3][clusterApplierService#updateTask][T#1],5,TGRP-StatelessIT]] to not be the cluster state
2672+
// update thread. Reason: [method IndexShard#getCurrentEngine (or one of its variant) can block]
2673+
// at __randomizedtesting.SeedInfo.seed([6244501B70969C37]:0)
2674+
// at org.elasticsearch.cluster.service.ClusterApplierService.assertNotClusterStateUpdateThread(ClusterApplierService.java:386)
2675+
// at org.elasticsearch.index.shard.IndexShard.assertCurrentThreadWithEngine(IndexShard.java:3550)
2676+
// at org.elasticsearch.index.shard.IndexShard.getCurrentEngine(IndexShard.java:3430)
2677+
// at org.elasticsearch.index.shard.IndexShard.onSettingsChanged(IndexShard.java:2673)
2678+
// at org.elasticsearch.index.IndexService.updateMetadata(IndexService.java:1013)
2679+
// at org.elasticsearch.indices.cluster.IndicesClusterStateService.updateIndices(IndicesClusterStateService.java:662)
2680+
// at org.elasticsearch.indices.cluster.IndicesClusterStateService.doApplyClusterState(IndicesClusterStateService.java:322)
2681+
// at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:278)
2682+
// at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:572)
2683+
// at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:558)
2684+
// at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:531)
2685+
// at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:460)
2686+
// at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:159)
2687+
// at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:1000)
2688+
// at
2689+
// org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean
2690+
// (PrioritizedEsThreadPoolExecutor.java:218)
2691+
// at
2692+
// org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run
2693+
// (PrioritizedEsThreadPoolExecutor.java:184)
2694+
// at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1095)
2695+
// at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:619)
2696+
// at java.base/java.lang.Thread.run(Thread.java:1447)
2697+
var engine = getCurrentEngine(true, true);
26652698
if (engine != null) {
26662699
engine.onSettingsChanged();
26672700
}
@@ -2674,15 +2707,18 @@ public void onSettingsChanged() {
26742707
* Acquires a lock on Lucene soft-deleted documents to prevent them from being trimmed
26752708
*/
26762709
public Closeable acquireHistoryRetentionLock() {
2677-
return getEngine().acquireHistoryRetentionLock();
2710+
// Skip assertions since this is called either during recovery, or under a primary permit (thus during ingestion) which is not
2711+
// something that can happen concurrently with hollowing (primary relocation holds permits) or unhollowing (we block ingestion).
2712+
return getEngine(true).acquireHistoryRetentionLock();
26782713
}
26792714

26802715
/**
26812716
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
26822717
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()}
26832718
*/
26842719
public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo) {
2685-
return getEngine().hasCompleteOperationHistory(reason, startingSeqNo);
2720+
// Skip assertions since this is called during recovery.
2721+
return getEngine(true).hasCompleteOperationHistory(reason, startingSeqNo);
26862722
}
26872723

26882724
/**
@@ -2691,7 +2727,8 @@ public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo) {
26912727
* @return the minimum retained sequence number
26922728
*/
26932729
public long getMinRetainedSeqNo() {
2694-
return getEngine().getMinRetainedSeqNo();
2730+
// Skip assertions since this is called under a primary permit
2731+
return getEngine(true).getMinRetainedSeqNo();
26952732
}
26962733

26972734
/**
@@ -3045,7 +3082,11 @@ public void markAllocationIdAsInSync(final String allocationId, final long local
30453082
* @return the local checkpoint
30463083
*/
30473084
public long getLocalCheckpoint() {
3048-
return getEngine().getPersistedLocalCheckpoint();
3085+
return getLocalCheckpoint(false);
3086+
}
3087+
3088+
public long getLocalCheckpoint(boolean skipAssertions) {
3089+
return getEngine(skipAssertions).getPersistedLocalCheckpoint();
30493090
}
30503091

30513092
/**
@@ -3087,7 +3128,10 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
30873128
}
30883129
assert assertPrimaryMode();
30893130
// only sync if there are no operations in flight, or when using async durability
3090-
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
3131+
// Skip assertions because this is called either from an async thread (so OK to block) or from the post operation of a replication
3132+
// action under a primary permit -- which would not be concurrent with either hollowing/unhollowing (since ingestion is blocked
3133+
// before hollowing, due to the primary relocation, and due to our own blocker during unhollowing).
3134+
final SeqNoStats stats = getEngine(true).getSeqNoStats(replicationTracker.getGlobalCheckpoint());
30913135
final boolean asyncDurability = indexSettings().getTranslogDurability() == Translog.Durability.ASYNC;
30923136
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint() || asyncDurability) {
30933137
final var trackedGlobalCheckpointsNeedSync = replicationTracker.trackedGlobalCheckpointsNeedSync();
@@ -3150,7 +3194,7 @@ public PendingReplicationActions getPendingReplicationActions() {
31503194
*/
31513195
public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) {
31523196
assert assertReplicationTarget();
3153-
final long localCheckpoint = getLocalCheckpoint();
3197+
final long localCheckpoint = getLocalCheckpoint(true);
31543198
if (globalCheckpoint > localCheckpoint) {
31553199
/*
31563200
* This can happen during recovery when the shard has started its engine but recovery is not finalized and is receiving global
@@ -3390,9 +3434,14 @@ private void doCheckIndex() throws IOException {
33903434
*/
33913435
@Deprecated
33923436
Engine getEngine() {
3437+
return getEngine(false);
3438+
}
3439+
3440+
@Deprecated
3441+
Engine getEngine(boolean skipAssertions) {
33933442
engineResetLock.readLock().lock();
33943443
try {
3395-
return getCurrentEngine(false);
3444+
return getCurrentEngine(false, skipAssertions);
33963445
} finally {
33973446
engineResetLock.readLock().unlock();
33983447
}
@@ -3408,13 +3457,20 @@ Engine getEngine() {
34083457
public Engine getEngineOrNull() {
34093458
engineResetLock.readLock().lock();
34103459
try {
3411-
return getCurrentEngine(true);
3460+
return getCurrentEngine(true, false);
34123461
} finally {
34133462
engineResetLock.readLock().unlock();
34143463
}
34153464
}
34163465

3417-
private Engine getCurrentEngine(boolean allowNoEngine) {
3466+
private Engine getCurrentEngine(boolean allowNoEngine, boolean skipAssertions) {
3467+
// We only reset a shard when it's relocating (primary relocation) or started (unhollowing)
3468+
boolean shardRoutingEngineResettable = shardRouting.started() || shardRouting.relocating();
3469+
assert skipAssertions
3470+
|| shardRouting.primary() == false // exclude non-primary shards. We only reset primary shards.
3471+
|| shardRoutingEngineResettable == false
3472+
|| state() != IndexShardState.STARTED // exclude getting the engine when not started. We only reset started shards.
3473+
|| assertCurrentThreadWithEngine("method IndexShard#getCurrentEngine (or one of its variant) can block");
34183474
assert engineResetLock.isReadLockedByCurrentThread() || engineResetLock.isWriteLockedByCurrentThread() /* for resets */;
34193475
var engine = currentEngine.get();
34203476
if (engine == null && allowNoEngine == false) {
@@ -3482,12 +3538,12 @@ public <R> R withEngine(Function<Engine, R> operation) {
34823538
* @throws AlreadyClosedException if the current engine instance is {@code null}.
34833539
*/
34843540
public <R, E extends Exception> R withEngineException(CheckedFunction<Engine, R, E> operation) throws E {
3485-
assert assertCurrentThreadWithEngine();
3541+
assert assertCurrentThreadWithEngine("method IndexShard#withEngineException (or one of its variant) can block");
34863542
assert operation != null;
34873543

34883544
engineResetLock.readLock().lock();
34893545
try {
3490-
var engine = getCurrentEngine(false);
3546+
var engine = getCurrentEngine(false, false);
34913547
return operation.apply(engine);
34923548
} finally {
34933549
engineResetLock.readLock().unlock();
@@ -3526,7 +3582,7 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, b
35263582
}
35273583
if (locked) {
35283584
try {
3529-
var engine = getCurrentEngine(allowNoEngine);
3585+
var engine = getCurrentEngine(allowNoEngine, noEngineDuringReset);
35303586
return operation.apply(engine);
35313587
} finally {
35323588
engineResetLock.readLock().unlock();
@@ -3536,8 +3592,7 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, b
35363592
}
35373593
}
35383594

3539-
private static boolean assertCurrentThreadWithEngine() {
3540-
var message = "method IndexShard#withEngine (or one of its variant) can block";
3595+
private static boolean assertCurrentThreadWithEngine(String message) {
35413596
assert ClusterApplierService.assertNotClusterStateUpdateThread(message);
35423597
assert MasterService.assertNotMasterUpdateThread(message);
35433598
assert Transports.assertNotTransportThread(message);
@@ -4600,7 +4655,7 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
46004655
try {
46014656
engineResetLock.writeLock().lock();
46024657
try {
4603-
var engine = getCurrentEngine(false);
4658+
var engine = getCurrentEngine(false, true);
46044659
engine.prepareForEngineReset();
46054660
var newEngine = createEngine(newEngineConfig(replicationTracker));
46064661
getAndSetCurrentEngine(newEngine);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,9 @@ protected void asyncShardOperation(final Request request, final ShardId shardId,
403403
throws IOException {
404404
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
405405
final IndexShard indexShard = indexService.getShard(request.getShard().id());
406-
final SeqNoStats seqNoStats = indexShard.seqNoStats();
406+
// This may be problematic as it's called by the action. We need to think about it.
407+
// Setting skipAssertions=True to avoid this assertion failure.
408+
final SeqNoStats seqNoStats = indexShard.seqNoStats(true);
407409

408410
if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) {
409411
logger.trace(

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,8 @@ protected void adaptResponse(BulkShardOperationsResponse response, IndexShard in
292292
}
293293

294294
public static void adaptBulkShardOperationsResponse(BulkShardOperationsResponse response, IndexShard indexShard) {
295-
final SeqNoStats seqNoStats = indexShard.seqNoStats();
295+
// Skip assertions since run with primary shard reference / operation permit
296+
final SeqNoStats seqNoStats = indexShard.seqNoStats(true);
296297
// return a fresh global checkpoint after the operations have been replicated for the shard follow task
297298
response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint());
298299
response.setMaxSeqNo(seqNoStats.getMaxSeqNo());

x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsShardAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,9 @@ protected Response shardOperation(Request request, ShardId shardId) {
172172
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
173173
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
174174
final IndexShard indexShard = indexService.getShard(shardId.id());
175-
final SeqNoStats seqNoStats = indexShard.seqNoStats();
175+
// This may be problematic as it's called by the action. We need to think about it.
176+
// Setting skipAssertions=True to avoid this assertion failure.
177+
final SeqNoStats seqNoStats = indexShard.seqNoStats(true);
176178

177179
if (request.waitForAdvance() && request.checkpoint() >= seqNoStats.getGlobalCheckpoint()) {
178180
indexShard.addGlobalCheckpointListener(request.checkpoint() + 1, new GlobalCheckpointListeners.GlobalCheckpointListener() {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,20 @@ protected static void getGlobalCheckpoints(
9696
Arrays.fill(seqNumbers, SequenceNumbers.UNASSIGNED_SEQ_NO);
9797
return seqNumbers;
9898
});
99-
checkpointsByIndexOfThisNode.get(shardId.getIndexName())[shardId.getId()] = indexShard.seqNoStats().getGlobalCheckpoint();
99+
// This may be problematic as it's called by
100+
// java.lang.AssertionError: Expected current thread [Thread[#117,elasticsearch[v9.2.0-2][transport_worker][T#25],5,main]]
101+
// to not be a transport thread. Reason: [method IndexShard#getCurrentEngine (or one of its variant) can block]
102+
// at org.elasticsearch.transport.Transports.assertNotTransportThread(Transports.java:68)
103+
// at org.elasticsearch.index.shard.IndexShard.assertCurrentThreadWithEngine(IndexShard.java:3599)
104+
// at org.elasticsearch.index.shard.IndexShard.getCurrentEngine(IndexShard.java:3477)
105+
// at org.elasticsearch.index.shard.IndexShard.getEngine(IndexShard.java:3448)
106+
// at org.elasticsearch.index.shard.IndexShard.getEngine(IndexShard.java:3441)
107+
// at org.elasticsearch.index.shard.IndexShard.seqNoStats(IndexShard.java:1403)
108+
// at org.elasticsearch.xpack.transform.action.TransportGetCheckpointNodeAction.getGlobalCheckpoints
109+
// (TransportGetCheckpointNodeAction.java:99) ~[?:?]
110+
// We need to think about it. Setting skipAssertions=True to avoid this assertion failure.
111+
checkpointsByIndexOfThisNode.get(shardId.getIndexName())[shardId.getId()] = indexShard.seqNoStats(true)
112+
.getGlobalCheckpoint();
100113
++numProcessedShards;
101114
} catch (Exception e) {
102115
logger.atDebug()

0 commit comments

Comments
 (0)