Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 81 additions & 25 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,11 @@ public CommitStats commitStats() {
* @throws AlreadyClosedException if shard is closed
*/
public SeqNoStats seqNoStats() {
return getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
return seqNoStats(false);
}

public SeqNoStats seqNoStats(boolean skipAssertions) {
return getEngine(skipAssertions).getSeqNoStats(replicationTracker.getGlobalCheckpoint());
}

public IndexingStats indexingStats() {
Expand Down Expand Up @@ -1463,11 +1467,12 @@ public StoreStats storeStats() {
}

public MergeStats mergeStats() {
final Engine engine = getEngineOrNull();
if (engine == null) {
return new MergeStats();
}
return engine.getMergeStats();
return tryWithEngineOrNull(engine -> {
if (engine == null) {
return new MergeStats();
}
return engine.getMergeStats();
});
}

public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) {
Expand Down Expand Up @@ -1707,14 +1712,14 @@ public Engine.SearcherSupplier acquireSearcherSupplier() {
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) {
readAllowed();
markSearcherAccessed();
final Engine engine = getEngine();
final Engine engine = getEngine(true); // should primarily happen on search nodes
return engine.acquireSearcherSupplier(this::wrapSearcher, scope);
}

public Engine.Searcher acquireSearcher(String source) {
readAllowed();
markSearcherAccessed();
final Engine engine = getEngine();
final Engine engine = getEngine(true); // should primarily happen on search nodes
return engine.acquireSearcher(source, Engine.SearcherScope.EXTERNAL, this::wrapSearcher);
}

Expand Down Expand Up @@ -2661,7 +2666,35 @@ boolean shouldRollTranslogGeneration() {
public void onSettingsChanged() {
engineResetLock.readLock().lock();
try {
var engine = getCurrentEngine(true);
// TODO this may be called by a cluster state update thread and we need to consider whether it is an issue
// java.lang.AssertionError: Expected current thread
// [Thread[#97,elasticsearch[node_t3][clusterApplierService#updateTask][T#1],5,TGRP-StatelessIT]] to not be the cluster state
// update thread. Reason: [method IndexShard#getCurrentEngine (or one of its variant) can block]
// at __randomizedtesting.SeedInfo.seed([6244501B70969C37]:0)
// at org.elasticsearch.cluster.service.ClusterApplierService.assertNotClusterStateUpdateThread(ClusterApplierService.java:386)
// at org.elasticsearch.index.shard.IndexShard.assertCurrentThreadWithEngine(IndexShard.java:3550)
// at org.elasticsearch.index.shard.IndexShard.getCurrentEngine(IndexShard.java:3430)
// at org.elasticsearch.index.shard.IndexShard.onSettingsChanged(IndexShard.java:2673)
// at org.elasticsearch.index.IndexService.updateMetadata(IndexService.java:1013)
// at org.elasticsearch.indices.cluster.IndicesClusterStateService.updateIndices(IndicesClusterStateService.java:662)
// at org.elasticsearch.indices.cluster.IndicesClusterStateService.doApplyClusterState(IndicesClusterStateService.java:322)
// at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:278)
// at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:572)
// at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:558)
// at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:531)
// at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:460)
// at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:159)
// at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:1000)
// at
// org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean
// (PrioritizedEsThreadPoolExecutor.java:218)
// at
// org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run
// (PrioritizedEsThreadPoolExecutor.java:184)
// at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1095)
// at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:619)
// at java.base/java.lang.Thread.run(Thread.java:1447)
var engine = getCurrentEngine(true, true);
if (engine != null) {
engine.onSettingsChanged();
}
Expand All @@ -2674,15 +2707,18 @@ public void onSettingsChanged() {
* Acquires a lock on Lucene soft-deleted documents to prevent them from being trimmed
*/
public Closeable acquireHistoryRetentionLock() {
return getEngine().acquireHistoryRetentionLock();
// Skip assertions since this is called either during recovery, or under a primary permit (thus during ingestion) which is not
// something that can happen concurrently with hollowing (primary relocation holds permits) or unhollowing (we block ingestion).
return getEngine(true).acquireHistoryRetentionLock();
}

/**
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()}
*/
public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo) {
return getEngine().hasCompleteOperationHistory(reason, startingSeqNo);
// Skip assertions since this is called during recovery.
return getEngine(true).hasCompleteOperationHistory(reason, startingSeqNo);
}

/**
Expand All @@ -2691,7 +2727,8 @@ public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo) {
* @return the minimum retained sequence number
*/
public long getMinRetainedSeqNo() {
return getEngine().getMinRetainedSeqNo();
// Skip assertions since this is called under a primary permit
return getEngine(true).getMinRetainedSeqNo();
}

/**
Expand Down Expand Up @@ -3045,7 +3082,11 @@ public void markAllocationIdAsInSync(final String allocationId, final long local
* @return the local checkpoint
*/
public long getLocalCheckpoint() {
return getEngine().getPersistedLocalCheckpoint();
return getLocalCheckpoint(false);
}

public long getLocalCheckpoint(boolean skipAssertions) {
return getEngine(skipAssertions).getPersistedLocalCheckpoint();
}

/**
Expand Down Expand Up @@ -3087,7 +3128,10 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
}
assert assertPrimaryMode();
// only sync if there are no operations in flight, or when using async durability
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
// Skip assertions because this is called either from an async thread (so OK to block) or from the post operation of a replication
// action under a primary permit -- which would not be concurrent with either hollowing/unhollowing (since ingestion is blocked
// before hollowing, due to the primary relocation, and due to our own blocker during unhollowing).
final SeqNoStats stats = getEngine(true).getSeqNoStats(replicationTracker.getGlobalCheckpoint());
final boolean asyncDurability = indexSettings().getTranslogDurability() == Translog.Durability.ASYNC;
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint() || asyncDurability) {
final var trackedGlobalCheckpointsNeedSync = replicationTracker.trackedGlobalCheckpointsNeedSync();
Expand Down Expand Up @@ -3150,7 +3194,7 @@ public PendingReplicationActions getPendingReplicationActions() {
*/
public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) {
assert assertReplicationTarget();
final long localCheckpoint = getLocalCheckpoint();
final long localCheckpoint = getLocalCheckpoint(true);
if (globalCheckpoint > localCheckpoint) {
/*
* This can happen during recovery when the shard has started its engine but recovery is not finalized and is receiving global
Expand Down Expand Up @@ -3390,9 +3434,14 @@ private void doCheckIndex() throws IOException {
*/
@Deprecated
Engine getEngine() {
return getEngine(false);
}

@Deprecated
Engine getEngine(boolean skipAssertions) {
engineResetLock.readLock().lock();
try {
return getCurrentEngine(false);
return getCurrentEngine(false, skipAssertions);
} finally {
engineResetLock.readLock().unlock();
}
Expand All @@ -3408,13 +3457,20 @@ Engine getEngine() {
public Engine getEngineOrNull() {
engineResetLock.readLock().lock();
try {
return getCurrentEngine(true);
return getCurrentEngine(true, false);
} finally {
engineResetLock.readLock().unlock();
}
}

private Engine getCurrentEngine(boolean allowNoEngine) {
private Engine getCurrentEngine(boolean allowNoEngine, boolean skipAssertions) {
// We only reset a shard when it's relocating (primary relocation) or started (unhollowing)
boolean shardRoutingEngineResettable = shardRouting.started() || shardRouting.relocating();
assert skipAssertions
|| shardRouting.primary() == false // exclude non-primary shards. We only reset primary shards.
|| shardRoutingEngineResettable == false
|| state() != IndexShardState.STARTED // exclude getting the engine when not started. We only reset started shards.
|| assertCurrentThreadWithEngine("method IndexShard#getCurrentEngine (or one of its variant) can block");
assert engineResetLock.isReadLockedByCurrentThread() || engineResetLock.isWriteLockedByCurrentThread() /* for resets */;
var engine = currentEngine.get();
if (engine == null && allowNoEngine == false) {
Expand Down Expand Up @@ -3482,12 +3538,12 @@ public <R> R withEngine(Function<Engine, R> operation) {
* @throws AlreadyClosedException if the current engine instance is {@code null}.
*/
public <R, E extends Exception> R withEngineException(CheckedFunction<Engine, R, E> operation) throws E {
assert assertCurrentThreadWithEngine();
assert assertCurrentThreadWithEngine("method IndexShard#withEngineException (or one of its variant) can block");
assert operation != null;

engineResetLock.readLock().lock();
try {
var engine = getCurrentEngine(false);
var engine = getCurrentEngine(false, false);
return operation.apply(engine);
} finally {
engineResetLock.readLock().unlock();
Expand All @@ -3513,7 +3569,8 @@ public <R, E extends Exception> R withEngineException(CheckedFunction<Engine, R,
*/
private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, boolean blockIfResetting) {
assert operation != null;
assert blockIfResetting == false || assertCurrentThreadWithEngine(); // assert current thread can block on engine resets
// assert current thread can block on engine resets
assert blockIfResetting == false || assertCurrentThreadWithEngine("method IndexShard#withEngine (or one of its variant) can block");
boolean locked = true;
if (blockIfResetting) {
engineResetLock.readLock().lock();
Expand All @@ -3526,7 +3583,7 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, b
}
if (locked) {
try {
var engine = getCurrentEngine(allowNoEngine);
var engine = getCurrentEngine(allowNoEngine, blockIfResetting == false);
return operation.apply(engine);
} finally {
engineResetLock.readLock().unlock();
Expand All @@ -3536,8 +3593,7 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, b
}
}

private static boolean assertCurrentThreadWithEngine() {
var message = "method IndexShard#withEngine (or one of its variant) can block";
private static boolean assertCurrentThreadWithEngine(String message) {
assert ClusterApplierService.assertNotClusterStateUpdateThread(message);
assert MasterService.assertNotMasterUpdateThread(message);
assert Transports.assertNotTransportThread(message);
Expand Down Expand Up @@ -4600,7 +4656,7 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
try {
engineResetLock.writeLock().lock();
try {
var engine = getCurrentEngine(false);
var engine = getCurrentEngine(false, true);
engine.prepareForEngineReset();
var newEngine = createEngine(newEngineConfig(replicationTracker));
getAndSetCurrentEngine(newEngine);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ protected void asyncShardOperation(final Request request, final ShardId shardId,
throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
final IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();
// This may be problematic as it's called by the action. We need to think about it.
// Setting skipAssertions=True to avoid this assertion failure.
final SeqNoStats seqNoStats = indexShard.seqNoStats(true);

if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ protected void adaptResponse(BulkShardOperationsResponse response, IndexShard in
}

public static void adaptBulkShardOperationsResponse(BulkShardOperationsResponse response, IndexShard indexShard) {
final SeqNoStats seqNoStats = indexShard.seqNoStats();
// Skip assertions since run with primary shard reference / operation permit
final SeqNoStats seqNoStats = indexShard.seqNoStats(true);
// return a fresh global checkpoint after the operations have been replicated for the shard follow task
response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint());
response.setMaxSeqNo(seqNoStats.getMaxSeqNo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ protected Response shardOperation(Request request, ShardId shardId) {
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();
// This may be problematic as it's called by the action. We need to think about it.
// Setting skipAssertions=True to avoid this assertion failure.
final SeqNoStats seqNoStats = indexShard.seqNoStats(true);

if (request.waitForAdvance() && request.checkpoint() >= seqNoStats.getGlobalCheckpoint()) {
indexShard.addGlobalCheckpointListener(request.checkpoint() + 1, new GlobalCheckpointListeners.GlobalCheckpointListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.LicenseStateListener;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.HashSet;
import java.util.Set;
Expand All @@ -34,21 +35,28 @@ public final class FailShardsOnInvalidLicenseClusterListener implements LicenseS

private final RerouteService rerouteService;

private final ThreadPool threadPool;

final Set<IndexShard> shardsToFail = new HashSet<>();

private boolean allowed;

public FailShardsOnInvalidLicenseClusterListener(XPackLicenseState xPackLicenseState, RerouteService rerouteService) {
public FailShardsOnInvalidLicenseClusterListener(
XPackLicenseState xPackLicenseState,
RerouteService rerouteService,
ThreadPool threadPool
) {
this.xPackLicenseState = xPackLicenseState;
this.rerouteService = rerouteService;
this.threadPool = threadPool;
this.allowed = ARCHIVE_FEATURE.checkWithoutTracking(xPackLicenseState);
xPackLicenseState.addListener(this);
}

@Override
public synchronized void afterIndexShardStarted(IndexShard indexShard) {
shardsToFail.add(indexShard);
failActiveShardsIfNecessary();
threadPool.generic().execute(() -> failActiveShardsIfNecessary());
}

@Override
Expand All @@ -75,11 +83,10 @@ public void onFailure(Exception e) {
});
}
this.allowed = allowed;
failActiveShardsIfNecessary();
threadPool.generic().execute(() -> failActiveShardsIfNecessary());
}

private void failActiveShardsIfNecessary() {
assert Thread.holdsLock(this);
private synchronized void failActiveShardsIfNecessary() {
if (allowed == false) {
for (IndexShard indexShard : shardsToFail) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ public class OldLuceneVersions extends Plugin implements IndexStorePlugin, Clust
public Collection<?> createComponents(PluginServices services) {
ThreadPool threadPool = services.threadPool();

this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), services.rerouteService()));
this.failShardsListener.set(
new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), services.rerouteService(), threadPool)
);
if (DiscoveryNode.isMasterNode(services.environment().settings())) {
// We periodically look through the indices and identify if there are any archive indices,
// then marking the feature as used. We do this on each master node so that if one master fails, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ public Collection<?> createComponents(PluginServices services) {
final List<Object> components = new ArrayList<>();
this.repositoriesService.set(services.repositoriesService());
this.threadPool.set(threadPool);
this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), services.rerouteService()));
this.failShardsListener.set(
new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), services.rerouteService(), threadPool)
);
if (DiscoveryNode.canContainData(settings)) {
final CacheService cacheService = new CacheService(settings, clusterService, threadPool, new PersistentCache(nodeEnvironment));
this.cacheService.set(cacheService);
Expand Down
Loading