diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 1220e706ebfc7..fa78a497e4bde 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1400,7 +1400,11 @@ public CommitStats commitStats() { * @throws AlreadyClosedException if shard is closed */ public SeqNoStats seqNoStats() { - return getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint()); + return seqNoStats(false); + } + + public SeqNoStats seqNoStats(boolean skipAssertions) { + return getEngine(skipAssertions).getSeqNoStats(replicationTracker.getGlobalCheckpoint()); } public IndexingStats indexingStats() { @@ -1463,11 +1467,12 @@ public StoreStats storeStats() { } public MergeStats mergeStats() { - final Engine engine = getEngineOrNull(); - if (engine == null) { - return new MergeStats(); - } - return engine.getMergeStats(); + return tryWithEngineOrNull(engine -> { + if (engine == null) { + return new MergeStats(); + } + return engine.getMergeStats(); + }); } public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) { @@ -1707,14 +1712,14 @@ public Engine.SearcherSupplier acquireSearcherSupplier() { public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) { readAllowed(); markSearcherAccessed(); - final Engine engine = getEngine(); + final Engine engine = getEngine(true); // should primarily happen on search nodes return engine.acquireSearcherSupplier(this::wrapSearcher, scope); } public Engine.Searcher acquireSearcher(String source) { readAllowed(); markSearcherAccessed(); - final Engine engine = getEngine(); + final Engine engine = getEngine(true); // should primarily happen on search nodes return engine.acquireSearcher(source, Engine.SearcherScope.EXTERNAL, this::wrapSearcher); } @@ -2661,7 +2666,35 @@ boolean shouldRollTranslogGeneration() { public void onSettingsChanged() { engineResetLock.readLock().lock(); try { - var engine = getCurrentEngine(true); + // TODO this may be called by a cluster state update thread and we need to consider whether it is an issue + // java.lang.AssertionError: Expected current thread + // [Thread[#97,elasticsearch[node_t3][clusterApplierService#updateTask][T#1],5,TGRP-StatelessIT]] to not be the cluster state + // update thread. Reason: [method IndexShard#getCurrentEngine (or one of its variant) can block] + // at __randomizedtesting.SeedInfo.seed([6244501B70969C37]:0) + // at org.elasticsearch.cluster.service.ClusterApplierService.assertNotClusterStateUpdateThread(ClusterApplierService.java:386) + // at org.elasticsearch.index.shard.IndexShard.assertCurrentThreadWithEngine(IndexShard.java:3550) + // at org.elasticsearch.index.shard.IndexShard.getCurrentEngine(IndexShard.java:3430) + // at org.elasticsearch.index.shard.IndexShard.onSettingsChanged(IndexShard.java:2673) + // at org.elasticsearch.index.IndexService.updateMetadata(IndexService.java:1013) + // at org.elasticsearch.indices.cluster.IndicesClusterStateService.updateIndices(IndicesClusterStateService.java:662) + // at org.elasticsearch.indices.cluster.IndicesClusterStateService.doApplyClusterState(IndicesClusterStateService.java:322) + // at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:278) + // at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:572) + // at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:558) + // at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:531) + // at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:460) + // at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:159) + // at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:1000) + // at + // org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean + // (PrioritizedEsThreadPoolExecutor.java:218) + // at + // org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run + // (PrioritizedEsThreadPoolExecutor.java:184) + // at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1095) + // at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:619) + // at java.base/java.lang.Thread.run(Thread.java:1447) + var engine = getCurrentEngine(true, true); if (engine != null) { engine.onSettingsChanged(); } @@ -2674,7 +2707,9 @@ public void onSettingsChanged() { * Acquires a lock on Lucene soft-deleted documents to prevent them from being trimmed */ public Closeable acquireHistoryRetentionLock() { - return getEngine().acquireHistoryRetentionLock(); + // Skip assertions since this is called either during recovery, or under a primary permit (thus during ingestion) which is not + // something that can happen concurrently with hollowing (primary relocation holds permits) or unhollowing (we block ingestion). + return getEngine(true).acquireHistoryRetentionLock(); } /** @@ -2682,7 +2717,8 @@ public Closeable acquireHistoryRetentionLock() { * This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()} */ public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo) { - return getEngine().hasCompleteOperationHistory(reason, startingSeqNo); + // Skip assertions since this is called during recovery. + return getEngine(true).hasCompleteOperationHistory(reason, startingSeqNo); } /** @@ -2691,7 +2727,8 @@ public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo) { * @return the minimum retained sequence number */ public long getMinRetainedSeqNo() { - return getEngine().getMinRetainedSeqNo(); + // Skip assertions since this is called under a primary permit + return getEngine(true).getMinRetainedSeqNo(); } /** @@ -3045,7 +3082,11 @@ public void markAllocationIdAsInSync(final String allocationId, final long local * @return the local checkpoint */ public long getLocalCheckpoint() { - return getEngine().getPersistedLocalCheckpoint(); + return getLocalCheckpoint(false); + } + + public long getLocalCheckpoint(boolean skipAssertions) { + return getEngine(skipAssertions).getPersistedLocalCheckpoint(); } /** @@ -3087,7 +3128,10 @@ public void maybeSyncGlobalCheckpoint(final String reason) { } assert assertPrimaryMode(); // only sync if there are no operations in flight, or when using async durability - final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint()); + // Skip assertions because this is called either from an async thread (so OK to block) or from the post operation of a replication + // action under a primary permit -- which would not be concurrent with either hollowing/unhollowing (since ingestion is blocked + // before hollowing, due to the primary relocation, and due to our own blocker during unhollowing). + final SeqNoStats stats = getEngine(true).getSeqNoStats(replicationTracker.getGlobalCheckpoint()); final boolean asyncDurability = indexSettings().getTranslogDurability() == Translog.Durability.ASYNC; if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint() || asyncDurability) { final var trackedGlobalCheckpointsNeedSync = replicationTracker.trackedGlobalCheckpointsNeedSync(); @@ -3150,7 +3194,7 @@ public PendingReplicationActions getPendingReplicationActions() { */ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) { assert assertReplicationTarget(); - final long localCheckpoint = getLocalCheckpoint(); + final long localCheckpoint = getLocalCheckpoint(true); if (globalCheckpoint > localCheckpoint) { /* * This can happen during recovery when the shard has started its engine but recovery is not finalized and is receiving global @@ -3390,9 +3434,14 @@ private void doCheckIndex() throws IOException { */ @Deprecated Engine getEngine() { + return getEngine(false); + } + + @Deprecated + Engine getEngine(boolean skipAssertions) { engineResetLock.readLock().lock(); try { - return getCurrentEngine(false); + return getCurrentEngine(false, skipAssertions); } finally { engineResetLock.readLock().unlock(); } @@ -3408,13 +3457,20 @@ Engine getEngine() { public Engine getEngineOrNull() { engineResetLock.readLock().lock(); try { - return getCurrentEngine(true); + return getCurrentEngine(true, false); } finally { engineResetLock.readLock().unlock(); } } - private Engine getCurrentEngine(boolean allowNoEngine) { + private Engine getCurrentEngine(boolean allowNoEngine, boolean skipAssertions) { + // We only reset a shard when it's relocating (primary relocation) or started (unhollowing) + boolean shardRoutingEngineResettable = shardRouting.started() || shardRouting.relocating(); + assert skipAssertions + || shardRouting.primary() == false // exclude non-primary shards. We only reset primary shards. + || shardRoutingEngineResettable == false + || state() != IndexShardState.STARTED // exclude getting the engine when not started. We only reset started shards. + || assertCurrentThreadWithEngine("method IndexShard#getCurrentEngine (or one of its variant) can block"); assert engineResetLock.isReadLockedByCurrentThread() || engineResetLock.isWriteLockedByCurrentThread() /* for resets */; var engine = currentEngine.get(); if (engine == null && allowNoEngine == false) { @@ -3482,12 +3538,12 @@ public R withEngine(Function operation) { * @throws AlreadyClosedException if the current engine instance is {@code null}. */ public R withEngineException(CheckedFunction operation) throws E { - assert assertCurrentThreadWithEngine(); + assert assertCurrentThreadWithEngine("method IndexShard#withEngineException (or one of its variant) can block"); assert operation != null; engineResetLock.readLock().lock(); try { - var engine = getCurrentEngine(false); + var engine = getCurrentEngine(false, false); return operation.apply(engine); } finally { engineResetLock.readLock().unlock(); @@ -3513,7 +3569,8 @@ public R withEngineException(CheckedFunction R withEngine(Function operation, boolean allowNoEngine, boolean blockIfResetting) { assert operation != null; - assert blockIfResetting == false || assertCurrentThreadWithEngine(); // assert current thread can block on engine resets + // assert current thread can block on engine resets + assert blockIfResetting == false || assertCurrentThreadWithEngine("method IndexShard#withEngine (or one of its variant) can block"); boolean locked = true; if (blockIfResetting) { engineResetLock.readLock().lock(); @@ -3526,7 +3583,7 @@ private R withEngine(Function operation, boolean allowNoEngine, b } if (locked) { try { - var engine = getCurrentEngine(allowNoEngine); + var engine = getCurrentEngine(allowNoEngine, blockIfResetting == false); return operation.apply(engine); } finally { engineResetLock.readLock().unlock(); @@ -3536,8 +3593,7 @@ private R withEngine(Function operation, boolean allowNoEngine, b } } - private static boolean assertCurrentThreadWithEngine() { - var message = "method IndexShard#withEngine (or one of its variant) can block"; + private static boolean assertCurrentThreadWithEngine(String message) { assert ClusterApplierService.assertNotClusterStateUpdateThread(message); assert MasterService.assertNotMasterUpdateThread(message); assert Transports.assertNotTransportThread(message); @@ -4600,7 +4656,7 @@ public void resetEngine(Consumer postResetNewEngineConsumer) { try { engineResetLock.writeLock().lock(); try { - var engine = getCurrentEngine(false); + var engine = getCurrentEngine(false, true); engine.prepareForEngineReset(); var newEngine = createEngine(newEngineConfig(replicationTracker)); getAndSetCurrentEngine(newEngine); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 0fef8e13dff08..47a7a94e25078 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -403,7 +403,9 @@ protected void asyncShardOperation(final Request request, final ShardId shardId, throws IOException { final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); final IndexShard indexShard = indexService.getShard(request.getShard().id()); - final SeqNoStats seqNoStats = indexShard.seqNoStats(); + // This may be problematic as it's called by the action. We need to think about it. + // Setting skipAssertions=True to avoid this assertion failure. + final SeqNoStats seqNoStats = indexShard.seqNoStats(true); if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) { logger.trace( diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 9a6e187e5544e..30e14a6cac65d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -292,7 +292,8 @@ protected void adaptResponse(BulkShardOperationsResponse response, IndexShard in } public static void adaptBulkShardOperationsResponse(BulkShardOperationsResponse response, IndexShard indexShard) { - final SeqNoStats seqNoStats = indexShard.seqNoStats(); + // Skip assertions since run with primary shard reference / operation permit + final SeqNoStats seqNoStats = indexShard.seqNoStats(true); // return a fresh global checkpoint after the operations have been replicated for the shard follow task response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint()); response.setMaxSeqNo(seqNoStats.getMaxSeqNo()); diff --git a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsShardAction.java b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsShardAction.java index a68dce31744ce..d099771eaa263 100644 --- a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsShardAction.java +++ b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsShardAction.java @@ -172,7 +172,9 @@ protected Response shardOperation(Request request, ShardId shardId) { protected void asyncShardOperation(Request request, ShardId shardId, ActionListener listener) throws IOException { final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.id()); - final SeqNoStats seqNoStats = indexShard.seqNoStats(); + // This may be problematic as it's called by the action. We need to think about it. + // Setting skipAssertions=True to avoid this assertion failure. + final SeqNoStats seqNoStats = indexShard.seqNoStats(true); if (request.waitForAdvance() && request.checkpoint() >= seqNoStats.getGlobalCheckpoint()) { indexShard.addGlobalCheckpointListener(request.checkpoint() + 1, new GlobalCheckpointListeners.GlobalCheckpointListener() { diff --git a/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/FailShardsOnInvalidLicenseClusterListener.java b/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/FailShardsOnInvalidLicenseClusterListener.java index c9c5a7c7a4861..ed233fa8cda28 100644 --- a/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/FailShardsOnInvalidLicenseClusterListener.java +++ b/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/FailShardsOnInvalidLicenseClusterListener.java @@ -20,6 +20,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.license.LicenseStateListener; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.threadpool.ThreadPool; import java.util.HashSet; import java.util.Set; @@ -34,13 +35,20 @@ public final class FailShardsOnInvalidLicenseClusterListener implements LicenseS private final RerouteService rerouteService; + private final ThreadPool threadPool; + final Set shardsToFail = new HashSet<>(); private boolean allowed; - public FailShardsOnInvalidLicenseClusterListener(XPackLicenseState xPackLicenseState, RerouteService rerouteService) { + public FailShardsOnInvalidLicenseClusterListener( + XPackLicenseState xPackLicenseState, + RerouteService rerouteService, + ThreadPool threadPool + ) { this.xPackLicenseState = xPackLicenseState; this.rerouteService = rerouteService; + this.threadPool = threadPool; this.allowed = ARCHIVE_FEATURE.checkWithoutTracking(xPackLicenseState); xPackLicenseState.addListener(this); } @@ -48,7 +56,7 @@ public FailShardsOnInvalidLicenseClusterListener(XPackLicenseState xPackLicenseS @Override public synchronized void afterIndexShardStarted(IndexShard indexShard) { shardsToFail.add(indexShard); - failActiveShardsIfNecessary(); + threadPool.generic().execute(() -> failActiveShardsIfNecessary()); } @Override @@ -75,11 +83,10 @@ public void onFailure(Exception e) { }); } this.allowed = allowed; - failActiveShardsIfNecessary(); + threadPool.generic().execute(() -> failActiveShardsIfNecessary()); } - private void failActiveShardsIfNecessary() { - assert Thread.holdsLock(this); + private synchronized void failActiveShardsIfNecessary() { if (allowed == false) { for (IndexShard indexShard : shardsToFail) { try { diff --git a/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/OldLuceneVersions.java b/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/OldLuceneVersions.java index 095a2328b69d9..72f1016165db3 100644 --- a/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/OldLuceneVersions.java +++ b/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/OldLuceneVersions.java @@ -79,7 +79,9 @@ public class OldLuceneVersions extends Plugin implements IndexStorePlugin, Clust public Collection createComponents(PluginServices services) { ThreadPool threadPool = services.threadPool(); - this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), services.rerouteService())); + this.failShardsListener.set( + new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), services.rerouteService(), threadPool) + ); if (DiscoveryNode.isMasterNode(services.environment().settings())) { // We periodically look through the indices and identify if there are any archive indices, // then marking the feature as used. We do this on each master node so that if one master fails, the diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index cb4d0f5eeda37..bafb670544667 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -319,7 +319,9 @@ public Collection createComponents(PluginServices services) { final List components = new ArrayList<>(); this.repositoriesService.set(services.repositoriesService()); this.threadPool.set(threadPool); - this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), services.rerouteService())); + this.failShardsListener.set( + new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), services.rerouteService(), threadPool) + ); if (DiscoveryNode.canContainData(settings)) { final CacheService cacheService = new CacheService(settings, clusterService, threadPool, new PersistentCache(nodeEnvironment)); this.cacheService.set(cacheService); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/FailShardsOnInvalidLicenseClusterListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/FailShardsOnInvalidLicenseClusterListener.java index 21d2297f57962..0a6a9f56795d4 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/FailShardsOnInvalidLicenseClusterListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/FailShardsOnInvalidLicenseClusterListener.java @@ -20,6 +20,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.license.LicenseStateListener; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.threadpool.ThreadPool; import java.util.HashSet; import java.util.Set; @@ -34,13 +35,20 @@ public class FailShardsOnInvalidLicenseClusterListener implements LicenseStateLi private final RerouteService rerouteService; + private final ThreadPool threadPool; + final Set shardsToFail = new HashSet<>(); private boolean allowed; - public FailShardsOnInvalidLicenseClusterListener(XPackLicenseState xPackLicenseState, RerouteService rerouteService) { + public FailShardsOnInvalidLicenseClusterListener( + XPackLicenseState xPackLicenseState, + RerouteService rerouteService, + ThreadPool threadPool + ) { this.xPackLicenseState = xPackLicenseState; this.rerouteService = rerouteService; + this.threadPool = threadPool; this.allowed = SEARCHABLE_SNAPSHOT_FEATURE.checkWithoutTracking(xPackLicenseState); xPackLicenseState.addListener(this); } @@ -48,7 +56,7 @@ public FailShardsOnInvalidLicenseClusterListener(XPackLicenseState xPackLicenseS @Override public synchronized void afterIndexShardStarted(IndexShard indexShard) { shardsToFail.add(indexShard); - failActiveShardsIfNecessary(); + threadPool.generic().execute(() -> failActiveShardsIfNecessary()); } @Override @@ -75,11 +83,10 @@ public void onFailure(Exception e) { }); } this.allowed = isAllowed; - failActiveShardsIfNecessary(); + threadPool.generic().execute(() -> failActiveShardsIfNecessary()); } - private void failActiveShardsIfNecessary() { - assert Thread.holdsLock(this); + private synchronized void failActiveShardsIfNecessary() { if (allowed == false) { for (IndexShard indexShard : shardsToFail) { try { diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java index a3c4776543350..5b3b90e1e415e 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java @@ -247,7 +247,7 @@ class TestTransportGetCheckpointNodeAction extends TransportGetCheckpointNodeAct IndexShard mockIndexShard = mock(IndexShard.class); when(mockIndexService.getShard(j)).thenReturn(mockIndexShard); SeqNoStats seqNoStats = new SeqNoStats(42 + i + j, 42 + i + j, 42 + i + j); - when(mockIndexShard.seqNoStats()).thenReturn(seqNoStats); + when(mockIndexShard.seqNoStats(true)).thenReturn(seqNoStats); } when(mockIndicesService.indexServiceSafe(indexMeta.getIndex())).thenReturn(mockIndexService); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java index 63b9dc7ae3fbb..d77620af462bb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java @@ -96,7 +96,20 @@ protected static void getGlobalCheckpoints( Arrays.fill(seqNumbers, SequenceNumbers.UNASSIGNED_SEQ_NO); return seqNumbers; }); - checkpointsByIndexOfThisNode.get(shardId.getIndexName())[shardId.getId()] = indexShard.seqNoStats().getGlobalCheckpoint(); + // This may be problematic as it's called by + // java.lang.AssertionError: Expected current thread [Thread[#117,elasticsearch[v9.2.0-2][transport_worker][T#25],5,main]] + // to not be a transport thread. Reason: [method IndexShard#getCurrentEngine (or one of its variant) can block] + // at org.elasticsearch.transport.Transports.assertNotTransportThread(Transports.java:68) + // at org.elasticsearch.index.shard.IndexShard.assertCurrentThreadWithEngine(IndexShard.java:3599) + // at org.elasticsearch.index.shard.IndexShard.getCurrentEngine(IndexShard.java:3477) + // at org.elasticsearch.index.shard.IndexShard.getEngine(IndexShard.java:3448) + // at org.elasticsearch.index.shard.IndexShard.getEngine(IndexShard.java:3441) + // at org.elasticsearch.index.shard.IndexShard.seqNoStats(IndexShard.java:1403) + // at org.elasticsearch.xpack.transform.action.TransportGetCheckpointNodeAction.getGlobalCheckpoints + // (TransportGetCheckpointNodeAction.java:99) ~[?:?] + // We need to think about it. Setting skipAssertions=True to avoid this assertion failure. + checkpointsByIndexOfThisNode.get(shardId.getIndexName())[shardId.getId()] = indexShard.seqNoStats(true) + .getGlobalCheckpoint(); ++numProcessedShards; } catch (Exception e) { logger.atDebug() diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeActionTests.java index 6b82c93a61752..d0164b42768a1 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeActionTests.java @@ -116,13 +116,13 @@ private void testGetGlobalCheckpointsSuccess(TimeValue timeout) throws Interrupt private void mockIndexServiceResponse() { IndexShard indexShardA0 = mock(IndexShard.class); - when(indexShardA0.seqNoStats()).thenReturn(new SeqNoStats(3_000, 2_000, 3_000)); + when(indexShardA0.seqNoStats(true)).thenReturn(new SeqNoStats(3_000, 2_000, 3_000)); IndexShard indexShardA1 = mock(IndexShard.class); - when(indexShardA1.seqNoStats()).thenReturn(new SeqNoStats(3_000, 2_000, 3_001)); + when(indexShardA1.seqNoStats(true)).thenReturn(new SeqNoStats(3_000, 2_000, 3_001)); IndexShard indexShardB0 = mock(IndexShard.class); - when(indexShardB0.seqNoStats()).thenReturn(new SeqNoStats(3_000, 2_000, 4_000)); + when(indexShardB0.seqNoStats(true)).thenReturn(new SeqNoStats(3_000, 2_000, 4_000)); IndexShard indexShardB1 = mock(IndexShard.class); - when(indexShardB1.seqNoStats()).thenReturn(new SeqNoStats(3_000, 2_000, 4_001)); + when(indexShardB1.seqNoStats(true)).thenReturn(new SeqNoStats(3_000, 2_000, 4_001)); Settings commonIndexSettings = Settings.builder() .put(SETTING_VERSION_CREATED, 1_000_000) .put(SETTING_NUMBER_OF_SHARDS, 2)