@@ -1463,11 +1463,12 @@ public StoreStats storeStats() {
14631463 }
14641464
14651465 public MergeStats mergeStats () {
1466- final Engine engine = getEngineOrNull ();
1467- if (engine == null ) {
1468- return new MergeStats ();
1469- }
1470- return engine .getMergeStats ();
1466+ return tryWithEngineOrNull (engine -> {
1467+ if (engine == null ) {
1468+ return new MergeStats ();
1469+ }
1470+ return engine .getMergeStats ();
1471+ });
14711472 }
14721473
14731474 public SegmentsStats segmentStats (boolean includeSegmentFileSizes , boolean includeUnloadedSegments ) {
@@ -1715,14 +1716,14 @@ public Engine.SearcherSupplier acquireSearcherSupplier() {
17151716 public Engine .SearcherSupplier acquireSearcherSupplier (Engine .SearcherScope scope ) {
17161717 readAllowed ();
17171718 markSearcherAccessed ();
1718- final Engine engine = getEngine ();
1719+ final Engine engine = getEngine (true ); // should primarily happen on search nodes
17191720 return engine .acquireSearcherSupplier (this ::wrapSearcher , scope );
17201721 }
17211722
17221723 public Engine .Searcher acquireSearcher (String source ) {
17231724 readAllowed ();
17241725 markSearcherAccessed ();
1725- final Engine engine = getEngine ();
1726+ final Engine engine = getEngine (true ); // should primarily happen on search nodes
17261727 return engine .acquireSearcher (source , Engine .SearcherScope .EXTERNAL , this ::wrapSearcher );
17271728 }
17281729
@@ -2669,7 +2670,33 @@ boolean shouldRollTranslogGeneration() {
26692670 public void onSettingsChanged () {
26702671 engineResetLock .readLock ().lock ();
26712672 try {
2672- var engine = getCurrentEngine (true );
2673+ // TODO this may be called by a cluster state update thread and we need to consider whether it is an issue
2674+ // java.lang.AssertionError: Expected current thread
2675+ // [Thread[#97,elasticsearch[node_t3][clusterApplierService#updateTask][T#1],5,TGRP-StatelessIT]] to not be the cluster state
2676+ // update thread. Reason: [method IndexShard#getCurrentEngine (or one of its variant) can block]
2677+ // at __randomizedtesting.SeedInfo.seed([6244501B70969C37]:0)
2678+ // at org.elasticsearch.cluster.service.ClusterApplierService.assertNotClusterStateUpdateThread(ClusterApplierService.java:386)
2679+ // at org.elasticsearch.index.shard.IndexShard.assertCurrentThreadWithEngine(IndexShard.java:3550)
2680+ // at org.elasticsearch.index.shard.IndexShard.getCurrentEngine(IndexShard.java:3430)
2681+ // at org.elasticsearch.index.shard.IndexShard.onSettingsChanged(IndexShard.java:2673)
2682+ // at org.elasticsearch.index.IndexService.updateMetadata(IndexService.java:1013)
2683+ // at org.elasticsearch.indices.cluster.IndicesClusterStateService.updateIndices(IndicesClusterStateService.java:662)
2684+ // at org.elasticsearch.indices.cluster.IndicesClusterStateService.doApplyClusterState(IndicesClusterStateService.java:322)
2685+ // at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:278)
2686+ // at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:572)
2687+ // at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:558)
2688+ // at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:531)
2689+ // at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:460)
2690+ // at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:159)
2691+ // at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:1000)
2692+ // at
2693+ // org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:218)
2694+ // at
2695+ // org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:184)
2696+ // at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1095)
2697+ // at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:619)
2698+ // at java.base/java.lang.Thread.run(Thread.java:1447)
2699+ var engine = getCurrentEngine (true , true );
26732700 if (engine != null ) {
26742701 engine .onSettingsChanged ();
26752702 }
@@ -3053,7 +3080,11 @@ public void markAllocationIdAsInSync(final String allocationId, final long local
30533080 * @return the local checkpoint
30543081 */
30553082 public long getLocalCheckpoint () {
3056- return getEngine ().getPersistedLocalCheckpoint ();
3083+ return getLocalCheckpoint (false );
3084+ }
3085+
3086+ public long getLocalCheckpoint (boolean skipAssertions ) {
3087+ return getEngine (skipAssertions ).getPersistedLocalCheckpoint ();
30573088 }
30583089
30593090 /**
@@ -3158,7 +3189,7 @@ public PendingReplicationActions getPendingReplicationActions() {
31583189 */
31593190 public void updateGlobalCheckpointOnReplica (final long globalCheckpoint , final String reason ) {
31603191 assert assertReplicationTarget ();
3161- final long localCheckpoint = getLocalCheckpoint ();
3192+ final long localCheckpoint = getLocalCheckpoint (true );
31623193 if (globalCheckpoint > localCheckpoint ) {
31633194 /*
31643195 * This can happen during recovery when the shard has started its engine but recovery is not finalized and is receiving global
@@ -3398,9 +3429,14 @@ private void doCheckIndex() throws IOException {
33983429 */
33993430 @ Deprecated
34003431 Engine getEngine () {
3432+ return getEngine (false );
3433+ }
3434+
3435+ @ Deprecated
3436+ Engine getEngine (boolean skipAssertions ) {
34013437 engineResetLock .readLock ().lock ();
34023438 try {
3403- return getCurrentEngine (false );
3439+ return getCurrentEngine (false , skipAssertions );
34043440 } finally {
34053441 engineResetLock .readLock ().unlock ();
34063442 }
@@ -3416,13 +3452,20 @@ Engine getEngine() {
34163452 public Engine getEngineOrNull () {
34173453 engineResetLock .readLock ().lock ();
34183454 try {
3419- return getCurrentEngine (true );
3455+ return getCurrentEngine (true , false );
34203456 } finally {
34213457 engineResetLock .readLock ().unlock ();
34223458 }
34233459 }
34243460
3425- private Engine getCurrentEngine (boolean allowNoEngine ) {
3461+ private Engine getCurrentEngine (boolean allowNoEngine , boolean skipAssertions ) {
3462+ // We only reset a shard when it's relocating (primary relocation) or started (unhollowing)
3463+ boolean shardRoutingEngineResettable = shardRouting .started () || shardRouting .relocating ();
3464+ assert skipAssertions
3465+ || shardRouting .primary () == false // exclude non-primary shards. We only reset primary shards.
3466+ || shardRoutingEngineResettable == false
3467+ || state () != IndexShardState .STARTED // exclude getting the engine when not started. We only reset started shards.
3468+ || assertCurrentThreadWithEngine ("method IndexShard#getCurrentEngine (or one of its variant) can block" );
34263469 assert engineResetLock .isReadLockedByCurrentThread () || engineResetLock .isWriteLockedByCurrentThread () /* for resets */ ;
34273470 var engine = currentEngine .get ();
34283471 if (engine == null && allowNoEngine == false ) {
@@ -3489,12 +3532,12 @@ public <R> R withEngine(Function<Engine, R> operation) {
34893532 * @throws AlreadyClosedException if the current engine instance is {@code null}.
34903533 */
34913534 public <R , E extends Exception > R withEngineException (CheckedFunction <Engine , R , E > operation ) throws E {
3492- assert assertCurrentThreadWithEngine ();
3535+ assert assertCurrentThreadWithEngine ("method IndexShard#withEngineException (or one of its variant) can block" );
34933536 assert operation != null ;
34943537
34953538 engineResetLock .readLock ().lock ();
34963539 try {
3497- var engine = getCurrentEngine (false );
3540+ var engine = getCurrentEngine (false , false );
34983541 return operation .apply (engine );
34993542 } finally {
35003543 engineResetLock .readLock ().unlock ();
@@ -3526,12 +3569,12 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, b
35263569 if (noEngineDuringReset ) {
35273570 locked = engineResetLock .readLock ().tryLock ();
35283571 } else {
3529- assert assertCurrentThreadWithEngine ();
3572+ assert assertCurrentThreadWithEngine ("method IndexShard#withEngine (or one of its variant) can block" );
35303573 engineResetLock .readLock ().lock ();
35313574 }
35323575 if (locked ) {
35333576 try {
3534- var engine = getCurrentEngine (allowNoEngine );
3577+ var engine = getCurrentEngine (allowNoEngine , noEngineDuringReset );
35353578 return operation .apply (engine );
35363579 } finally {
35373580 engineResetLock .readLock ().unlock ();
@@ -3541,8 +3584,7 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, b
35413584 }
35423585 }
35433586
3544- private static boolean assertCurrentThreadWithEngine () {
3545- var message = "method IndexShard#withEngine (or one of its variant) can block" ;
3587+ private static boolean assertCurrentThreadWithEngine (String message ) {
35463588 assert ClusterApplierService .assertNotClusterStateUpdateThread (message );
35473589 assert MasterService .assertNotMasterUpdateThread (message );
35483590 assert Transports .assertNotTransportThread (message );
@@ -4605,7 +4647,7 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
46054647 try {
46064648 engineResetLock .writeLock ().lock ();
46074649 try {
4608- var engine = getCurrentEngine (false );
4650+ var engine = getCurrentEngine (false , true );
46094651 engine .prepareForEngineReset ();
46104652 var newEngine = createEngine (newEngineConfig (replicationTracker ));
46114653 getAndSetCurrentEngine (newEngine );
0 commit comments