diff --git a/docs/changelog/126581.yaml b/docs/changelog/126581.yaml new file mode 100644 index 0000000000000..53fcb8a6057b3 --- /dev/null +++ b/docs/changelog/126581.yaml @@ -0,0 +1,10 @@ +pr: 126581 +summary: "Optimize shared blob cache evictions on shard removal + Shared blob cache evictions occur on the cluster applier thread when shards are + removed from a node. These can be expensive if a large number of shards are + being removed. This change uses the context of the removal to avoid unnecessary + evictions that might hold up the applier thread. + " +area: Snapshot/Restore +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java index 21b3e2831f216..ba5f250c91599 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/plugins/IndexFoldersDeletionListenerIT.java @@ -22,6 +22,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -345,12 +346,22 @@ public static class IndexFoldersDeletionListenerPlugin extends Plugin implements public List getIndexFoldersDeletionListeners() { return List.of(new IndexFoldersDeletionListener() { @Override - public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths) { + public void beforeIndexFoldersDeleted( + Index index, + IndexSettings indexSettings, + Path[] indexPaths, + IndexRemovalReason reason + ) { deletedIndices.add(index); } @Override - public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths) { + public void beforeShardFoldersDeleted( + ShardId shardId, + IndexSettings indexSettings, + Path[] shardPaths, + IndexRemovalReason reason + ) { deletedShards.computeIfAbsent(shardId.getIndex(), i -> Collections.synchronizedList(new ArrayList<>())).add(shardId); } }); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java index d523f233e876a..5bb0aa4e73434 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java @@ -48,7 +48,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; -import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED; +import static org.elasticsearch.indices.cluster.IndexRemovalReason.NO_LONGER_ASSIGNED; /** * Service responsible for submitting add and remove aliases requests diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index 20bafdc931274..b688177611f91 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -83,7 +83,7 @@ import static java.util.Collections.emptyMap; import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.validateTimestampFieldMapping; -import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED; +import static org.elasticsearch.indices.cluster.IndexRemovalReason.NO_LONGER_ASSIGNED; /** * Service responsible for submitting index templates updates diff --git a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index 03bbf77b66046..1947bcd6e148f 100644 --- a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -21,7 +21,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index cdf1ad177f1e4..d5c00294aa6b8 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -83,6 +83,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.recovery.RecoveryState; @@ -494,7 +495,12 @@ public synchronized IndexShard createShard( nodeEnv, lock, this.indexSettings, - shardPaths -> indexFoldersDeletionListener.beforeShardFoldersDeleted(shardId, this.indexSettings, shardPaths) + shardPaths -> indexFoldersDeletionListener.beforeShardFoldersDeleted( + shardId, + this.indexSettings, + shardPaths, + IndexRemovalReason.FAILURE + ) ); path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath()); } catch (Exception inner) { @@ -704,11 +710,11 @@ private void onShardClose(ShardLock lock) { try { eventListener.beforeIndexShardDeleted(lock.getShardId(), indexSettings.getSettings()); } finally { - shardStoreDeleter.deleteShardStore("delete index", lock, indexSettings); + shardStoreDeleter.deleteShardStore("delete index", lock, indexSettings, IndexRemovalReason.DELETED); eventListener.afterIndexShardDeleted(lock.getShardId(), indexSettings.getSettings()); } } catch (IOException e) { - shardStoreDeleter.addPendingDelete(lock.getShardId(), indexSettings); + shardStoreDeleter.addPendingDelete(lock.getShardId(), indexSettings, IndexRemovalReason.DELETED); logger.debug(() -> "[" + lock.getShardId().id() + "] failed to delete shard content - scheduled a retry", e); } } @@ -1062,9 +1068,9 @@ public static Function dateMathExpressionResolverAt(long instant } public interface ShardStoreDeleter { - void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException; + void deleteShardStore(String reasonText, ShardLock lock, IndexSettings indexSettings, IndexRemovalReason reason) throws IOException; - void addPendingDelete(ShardId shardId, IndexSettings indexSettings); + void addPendingDelete(ShardId shardId, IndexSettings indexSettings, IndexRemovalReason reason); } public final EngineFactory getEngineFactory() { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java index 92af26228948c..d0113897432de 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java @@ -15,7 +15,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import org.elasticsearch.indices.cluster.IndexRemovalReason; /** * An index event listener is the primary extension point for plugins and build-in services diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 27e7273ac506f..73747bc798d30 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -133,6 +133,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; @@ -1021,7 +1022,7 @@ public void removeIndex( listener.afterIndexRemoved(indexService.index(), indexSettings, reason); if (reason == IndexRemovalReason.DELETED) { // now we are done - try to wipe data on disk if possible - deleteIndexStore(extraInfo, indexService.index(), indexSettings); + deleteIndexStore(extraInfo, indexService.index(), indexSettings, reason); } })); }); @@ -1105,7 +1106,7 @@ public void deleteUnassignedIndex(String reason, IndexMetadata oldIndexMetadata, + "]" ); } - deleteIndexStore(reason, oldIndexMetadata); + deleteIndexStore(reason, oldIndexMetadata, IndexRemovalReason.DELETED); } catch (Exception e) { logger.warn(() -> format("[%s] failed to delete unassigned index (reason [%s])", oldIndexMetadata.getIndex(), reason), e); } @@ -1118,7 +1119,7 @@ public void deleteUnassignedIndex(String reason, IndexMetadata oldIndexMetadata, * * Package private for testing */ - void deleteIndexStore(String reason, IndexMetadata metadata) throws IOException { + void deleteIndexStore(String reasonText, IndexMetadata metadata, IndexRemovalReason reason) throws IOException { if (nodeEnv.hasNodeFile()) { synchronized (this) { Index index = metadata.getIndex(); @@ -1136,33 +1137,35 @@ void deleteIndexStore(String reason, IndexMetadata metadata) throws IOException } } final IndexSettings indexSettings = buildIndexSettings(metadata); - deleteIndexStore(reason, indexSettings.getIndex(), indexSettings); + deleteIndexStore(reasonText, indexSettings.getIndex(), indexSettings, reason); } } - private void deleteIndexStore(String reason, Index index, IndexSettings indexSettings) throws IOException { - deleteIndexStoreIfDeletionAllowed(reason, index, indexSettings, DEFAULT_INDEX_DELETION_PREDICATE); + private void deleteIndexStore(String reasonText, Index index, IndexSettings indexSettings, IndexRemovalReason reason) + throws IOException { + deleteIndexStoreIfDeletionAllowed(reasonText, index, indexSettings, DEFAULT_INDEX_DELETION_PREDICATE, reason); } private void deleteIndexStoreIfDeletionAllowed( - final String reason, + final String reasonText, final Index index, final IndexSettings indexSettings, - final IndexDeletionAllowedPredicate predicate + final IndexDeletionAllowedPredicate predicate, + final IndexRemovalReason reason ) throws IOException { boolean success = false; try { // we are trying to delete the index store here - not a big deal if the lock can't be obtained // the store metadata gets wiped anyway even without the lock this is just best effort since // every shards deletes its content under the shard lock it owns. - logger.debug("{} deleting index store reason [{}]", index, reason); + logger.debug("{} deleting index store reason [{}]", index, reasonText); if (predicate.apply(index, indexSettings)) { // its safe to delete all index metadata and shard data nodeEnv.deleteIndexDirectorySafe( index, 0, indexSettings, - paths -> indexFoldersDeletionListeners.beforeIndexFoldersDeleted(index, indexSettings, paths) + paths -> indexFoldersDeletionListeners.beforeIndexFoldersDeleted(index, indexSettings, paths, reason) ); } success = true; @@ -1172,7 +1175,7 @@ private void deleteIndexStoreIfDeletionAllowed( logger.warn(() -> format("%s failed to delete index", index), ex); } finally { if (success == false) { - addPendingDelete(index, indexSettings); + addPendingDelete(index, indexSettings, reason); } // this is a pure protection to make sure this index doesn't get re-imported as a dangling index. // we should in the future rather write a tombstone rather than wiping the metadata. @@ -1182,19 +1185,21 @@ private void deleteIndexStoreIfDeletionAllowed( /** * Deletes the shard with an already acquired shard lock. - * @param reason the reason for the shard deletion + * @param reasonText the reason for the shard deletion * @param lock the lock of the shard to delete * @param indexSettings the shards index settings. + * @param reason the reason for the deletion (as an enum) * @throws IOException if an IOException occurs */ @Override - public void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException { + public void deleteShardStore(String reasonText, ShardLock lock, IndexSettings indexSettings, IndexRemovalReason reason) + throws IOException { ShardId shardId = lock.getShardId(); - logger.trace("{} deleting shard reason [{}]", shardId, reason); + logger.trace("{} deleting shard reason [{}]", shardId, reasonText); nodeEnv.deleteShardDirectoryUnderLock( lock, indexSettings, - paths -> indexFoldersDeletionListeners.beforeShardFoldersDeleted(shardId, indexSettings, paths) + paths -> indexFoldersDeletionListeners.beforeShardFoldersDeleted(shardId, indexSettings, paths, reason) ); } @@ -1206,13 +1211,14 @@ public void deleteShardStore(String reason, ShardLock lock, IndexSettings indexS * On data nodes, if the deleted shard is the last shard folder in its index, the method will attempt to remove * the index folder as well. * - * @param reason the reason for the shard deletion + * @param reasonText the reason for the shard deletion * @param shardId the shards ID to delete * @param clusterState . This is required to access the indexes settings etc. + * @param reason The reason for the deletion (as an enum) * @throws IOException if an IOException occurs */ - public void deleteShardStore(String reason, ShardId shardId, ClusterState clusterState) throws IOException, - ShardLockObtainFailedException { + public void deleteShardStore(String reasonText, ShardId shardId, ClusterState clusterState, IndexRemovalReason reason) + throws IOException, ShardLockObtainFailedException { final IndexMetadata metadata = clusterState.getMetadata().getProject().indices().get(shardId.getIndexName()); final IndexSettings indexSettings = buildIndexSettings(metadata); @@ -1223,15 +1229,15 @@ public void deleteShardStore(String reason, ShardId shardId, ClusterState cluste nodeEnv.deleteShardDirectorySafe( shardId, indexSettings, - paths -> indexFoldersDeletionListeners.beforeShardFoldersDeleted(shardId, indexSettings, paths) + paths -> indexFoldersDeletionListeners.beforeShardFoldersDeleted(shardId, indexSettings, paths, reason) ); - logger.debug("{} deleted shard reason [{}]", shardId, reason); + logger.debug("{} deleted shard reason [{}]", shardId, reasonText); if (canDeleteIndexContents(shardId.getIndex())) { if (nodeEnv.findAllShardIds(shardId.getIndex()).isEmpty()) { try { // note that deleteIndexStore have more safety checks and may throw an exception if index was concurrently created. - deleteIndexStore("no longer used", metadata); + deleteIndexStore("no longer used", metadata, reason); } catch (Exception e) { // wrap the exception to indicate we already deleted the shard throw new ElasticsearchException("failed to delete unused index after deleting its last shard (" + shardId + ")", e); @@ -1287,7 +1293,7 @@ public IndexMetadata verifyIndexIsDeleted(final Index index, final ClusterState } final IndexSettings indexSettings = buildIndexSettings(metadata); try { - deleteIndexStoreIfDeletionAllowed("stale deleted index", index, indexSettings, ALWAYS_TRUE); + deleteIndexStoreIfDeletionAllowed("stale deleted index", index, indexSettings, ALWAYS_TRUE, IndexRemovalReason.DELETED); } catch (Exception e) { // we just warn about the exception here because if deleteIndexStoreIfDeletionAllowed // throws an exception, it gets added to the list of pending deletes to be tried again @@ -1345,22 +1351,22 @@ private IndexSettings buildIndexSettings(IndexMetadata metadata) { * Adds a pending delete for the given index shard. */ @Override - public void addPendingDelete(ShardId shardId, IndexSettings settings) { + public void addPendingDelete(ShardId shardId, IndexSettings settings, IndexRemovalReason reason) { if (shardId == null) { throw new IllegalArgumentException("shardId must not be null"); } if (settings == null) { throw new IllegalArgumentException("settings must not be null"); } - PendingDelete pendingDelete = new PendingDelete(shardId, settings); + PendingDelete pendingDelete = new PendingDelete(shardId, settings, reason); addPendingDelete(shardId.getIndex(), pendingDelete); } /** * Adds a pending delete for the given index. */ - public void addPendingDelete(Index index, IndexSettings settings) { - PendingDelete pendingDelete = new PendingDelete(index, settings); + public void addPendingDelete(Index index, IndexSettings settings, IndexRemovalReason reason) { + PendingDelete pendingDelete = new PendingDelete(index, settings, reason); addPendingDelete(index, pendingDelete); } @@ -1376,25 +1382,28 @@ private static final class PendingDelete implements Comparable { final int shardId; final IndexSettings settings; final boolean deleteIndex; + final IndexRemovalReason reason; /** * Creates a new pending delete of an index */ - PendingDelete(ShardId shardId, IndexSettings settings) { + PendingDelete(ShardId shardId, IndexSettings settings, IndexRemovalReason reason) { this.index = shardId.getIndex(); this.shardId = shardId.getId(); this.settings = settings; this.deleteIndex = false; + this.reason = reason; } /** * Creates a new pending delete of a shard */ - PendingDelete(Index index, IndexSettings settings) { + PendingDelete(Index index, IndexSettings settings, IndexRemovalReason reason) { this.index = index; this.shardId = -1; this.settings = settings; this.deleteIndex = true; + this.reason = reason; } @Override @@ -1458,7 +1467,12 @@ public void processPendingDeletes(Index index, IndexSettings indexSettings, Time nodeEnv.deleteIndexDirectoryUnderLock( index, indexSettings, - paths -> indexFoldersDeletionListeners.beforeIndexFoldersDeleted(index, indexSettings, paths) + paths -> indexFoldersDeletionListeners.beforeIndexFoldersDeleted( + index, + indexSettings, + paths, + delete.reason + ) ); iterator.remove(); } catch (IOException ex) { @@ -1470,7 +1484,7 @@ public void processPendingDeletes(Index index, IndexSettings indexSettings, Time final ShardLock shardLock = locks.get(shardId); if (shardLock != null) { try { - deleteShardStore("pending delete", shardLock, delete.settings); + deleteShardStore("pending delete", shardLock, delete.settings, delete.reason); iterator.remove(); } catch (IOException ex) { logger.debug(() -> format("%s retry pending delete", shardLock.getShardId()), ex); diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndexRemovalReason.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndexRemovalReason.java new file mode 100644 index 0000000000000..01fcd8c680bec --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndexRemovalReason.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.indices.cluster; + +/** + * The reasons why an index or shard is being removed from a node. + */ +public enum IndexRemovalReason { + /** + * Shard of this index were previously assigned to this node but all shards have been relocated. + * The index should be removed and all associated resources released. Persistent parts of the index + * like the shards files, state and transaction logs are kept around in the case of a disaster recovery. + */ + NO_LONGER_ASSIGNED, + + /** + * The index is deleted. Persistent parts of the index like the shards files, state and transaction logs are removed once + * all resources are released. + */ + DELETED, + + /** + * The index has been closed. The index should be removed and all associated resources released. Persistent parts of the index + * like the shards files, state and transaction logs are kept around in the case of a disaster recovery. + */ + CLOSED, + + /** + * Something around index management has failed and the index should be removed. + * Persistent parts of the index like the shards files, state and transaction logs are kept around in the + * case of a disaster recovery. + */ + FAILURE, + + /** + * The index has been reopened. The index should be removed and all associated resources released. Persistent parts of the index + * like the shards files, state and transaction logs are kept around in the case of a disaster recovery. + */ + REOPENED, + + /** + * The index is closed as part of the node shutdown process. The index should be removed and all associated resources released. + * Persistent parts of the index like the shards files, state and transaction logs should be kept around in the case the node + * restarts. + */ + SHUTDOWN, +} diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index c0e897bc34319..ff1a83d26592f 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -97,11 +97,11 @@ import java.util.function.Consumer; import static org.elasticsearch.core.Strings.format; -import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.CLOSED; -import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED; -import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.FAILURE; -import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED; -import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.REOPENED; +import static org.elasticsearch.indices.cluster.IndexRemovalReason.CLOSED; +import static org.elasticsearch.indices.cluster.IndexRemovalReason.DELETED; +import static org.elasticsearch.indices.cluster.IndexRemovalReason.FAILURE; +import static org.elasticsearch.indices.cluster.IndexRemovalReason.NO_LONGER_ASSIGNED; +import static org.elasticsearch.indices.cluster.IndexRemovalReason.REOPENED; public class IndicesClusterStateService extends AbstractLifecycleComponent implements ClusterStateApplier { private static final Logger logger = LogManager.getLogger(IndicesClusterStateService.class); @@ -480,7 +480,7 @@ private void removeIndicesAndShards(final ClusterChangedEvent event) { final IndexMetadata indexMetadata = project.map(proj -> proj.index(index)).orElse(null); final IndexMetadata existingMetadata = indexService.getIndexSettings().getIndexMetadata(); - AllocatedIndices.IndexRemovalReason reason = null; + IndexRemovalReason reason = null; if (indexMetadata != null && indexMetadata.getState() != existingMetadata.getState()) { reason = indexMetadata.getState() == IndexMetadata.State.CLOSE ? CLOSED : REOPENED; } else if (localRoutingNode == null || localRoutingNode.hasIndex(index) == false) { @@ -1335,47 +1335,6 @@ default T getShardOrNull(ShardId shardId) { void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeValue) throws IOException, InterruptedException, ShardLockObtainFailedException; - - enum IndexRemovalReason { - /** - * Shard of this index were previously assigned to this node but all shards have been relocated. - * The index should be removed and all associated resources released. Persistent parts of the index - * like the shards files, state and transaction logs are kept around in the case of a disaster recovery. - */ - NO_LONGER_ASSIGNED, - - /** - * The index is deleted. Persistent parts of the index like the shards files, state and transaction logs are removed once - * all resources are released. - */ - DELETED, - - /** - * The index has been closed. The index should be removed and all associated resources released. Persistent parts of the index - * like the shards files, state and transaction logs are kept around in the case of a disaster recovery. - */ - CLOSED, - - /** - * Something around index management has failed and the index should be removed. - * Persistent parts of the index like the shards files, state and transaction logs are kept around in the - * case of a disaster recovery. - */ - FAILURE, - - /** - * The index has been reopened. The index should be removed and all associated resources released. Persistent parts of the index - * like the shards files, state and transaction logs are kept around in the case of a disaster recovery. - */ - REOPENED, - - /** - * The index is closed as part of the node shutdown process. The index should be removed and all associated resources released. - * Persistent parts of the index like the shards files, state and transaction logs should be kept around in the case the node - * restarts. - */ - SHUTDOWN, - } } static class ShardCloseExecutor implements Executor { diff --git a/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java b/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java index 0fffef74df25f..1cc57469bc955 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java +++ b/server/src/main/java/org/elasticsearch/indices/store/CompositeIndexFoldersDeletionListener.java @@ -12,6 +12,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.plugins.IndexStorePlugin; import java.nio.file.Path; @@ -31,10 +32,10 @@ public CompositeIndexFoldersDeletionListener(List { if (clusterStateVersion != currentState.getVersion()) { @@ -349,7 +350,7 @@ private void deleteShardStoreOnApplierThread(ShardId shardId, long clusterStateV return; } try { - indicesService.deleteShardStore("no longer used", shardId, currentState); + indicesService.deleteShardStore("no longer used", shardId, currentState, indexRemovalReason); } catch (Exception ex) { logger.debug(() -> format("%s failed to delete unallocated shard, ignoring", shardId), ex); } diff --git a/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java index 4b7520afbe7b6..73be788079c4f 100644 --- a/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java @@ -19,6 +19,7 @@ import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.indices.recovery.RecoveryState; import java.io.IOException; @@ -102,8 +103,9 @@ interface IndexFoldersDeletionListener { * @param index the {@link Index} of the index whose folders are going to be deleted * @param indexSettings settings for the index whose folders are going to be deleted * @param indexPaths the paths of the folders that are going to be deleted + * @param reason the reason for the deletion */ - void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths); + void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths, IndexRemovalReason reason); /** * Invoked before the folders of a shard are deleted from disk. The list of folders contains {@link Path}s that may or may not @@ -112,8 +114,9 @@ interface IndexFoldersDeletionListener { * @param shardId the {@link ShardId} of the shard whose folders are going to be deleted * @param indexSettings index settings of the shard whose folders are going to be deleted * @param shardPaths the paths of the folders that are going to be deleted + * @param reason the reason for the deletion */ - void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths); + void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths, IndexRemovalReason reason); } /** diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 2a814a1a36489..d485b53e7e409 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -78,7 +78,7 @@ import org.elasticsearch.indices.ExecutorSelector; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.script.FieldScript; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.aggregations.AggregationInitializationException; diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 565037eba8369..043b982ad4344 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -86,7 +86,7 @@ import org.elasticsearch.indices.analysis.AnalysisModule; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.IndexStorePlugin; @@ -144,18 +144,18 @@ public class IndexModuleTests extends ESTestCase { private IndexService.ShardStoreDeleter deleter = new IndexService.ShardStoreDeleter() { @Override - public void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException {} + public void deleteShardStore(String reasonText, ShardLock lock, IndexSettings indexSettings, IndexRemovalReason reason) {} @Override - public void addPendingDelete(ShardId shardId, IndexSettings indexSettings) {} + public void addPendingDelete(ShardId shardId, IndexSettings indexSettings, IndexRemovalReason reason) {} }; private IndexStorePlugin.IndexFoldersDeletionListener indexDeletionListener = new IndexStorePlugin.IndexFoldersDeletionListener() { @Override - public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths) {} + public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths, IndexRemovalReason reason) {} @Override - public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths) {} + public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths, IndexRemovalReason reason) {} }; private final IndexFieldDataCache.Listener listener = new IndexFieldDataCache.Listener() { diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 619714119a05e..ad69ad162190f 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -26,7 +26,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.emptySet; -import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED; +import static org.elasticsearch.indices.cluster.IndexRemovalReason.DELETED; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCase { diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java index d1344fbdb2d80..967dd3d7626b7 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java @@ -65,6 +65,7 @@ import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.similarity.NonNegativeScoresSimilarity; import org.elasticsearch.indices.IndicesService.ShardDeletionCheckResult; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.Plugin; @@ -323,7 +324,7 @@ public void testDeleteIndexStore() throws Exception { test.getIndexSettings().customDataPath() ); - expectThrows(IllegalStateException.class, () -> indicesService.deleteIndexStore("boom", firstMetadata)); + expectThrows(IllegalStateException.class, () -> indicesService.deleteIndexStore("boom", firstMetadata, randomReason())); assertTrue(firstPath.exists()); GatewayMetaState gwMetaState = getInstanceFromNode(GatewayMetaState.class); @@ -353,7 +354,7 @@ public void testDeleteIndexStore() throws Exception { ); assertTrue(secondPath.exists()); - expectThrows(IllegalStateException.class, () -> indicesService.deleteIndexStore("boom", secondMetadata)); + expectThrows(IllegalStateException.class, () -> indicesService.deleteIndexStore("boom", secondMetadata, randomReason())); assertTrue(secondPath.exists()); assertAcked(client().admin().indices().prepareOpen("test")); @@ -384,13 +385,13 @@ public void testPendingTasks() throws Exception { int numPending = 1; if (randomBoolean()) { - indicesService.addPendingDelete(indexShard.shardId(), indexSettings); + indicesService.addPendingDelete(indexShard.shardId(), indexSettings, randomReason()); } else { if (randomBoolean()) { numPending++; - indicesService.addPendingDelete(indexShard.shardId(), indexSettings); + indicesService.addPendingDelete(indexShard.shardId(), indexSettings, randomReason()); } - indicesService.addPendingDelete(index, indexSettings); + indicesService.addPendingDelete(index, indexSettings, randomReason()); } assertAcked(client().admin().indices().prepareClose("test")); @@ -410,9 +411,9 @@ public void testPendingTasks() throws Exception { final boolean hasBogus = randomBoolean(); if (hasBogus) { - indicesService.addPendingDelete(new ShardId(index, 0), indexSettings); - indicesService.addPendingDelete(new ShardId(index, 1), indexSettings); - indicesService.addPendingDelete(new ShardId("bogus", "_na_", 1), indexSettings); + indicesService.addPendingDelete(new ShardId(index, 0), indexSettings, randomReason()); + indicesService.addPendingDelete(new ShardId(index, 1), indexSettings, randomReason()); + indicesService.addPendingDelete(new ShardId("bogus", "_na_", 1), indexSettings, randomReason()); assertEquals(indicesService.numPendingDeletes(index), numPending + 2); assertTrue(indicesService.hasUncompletedPendingDeletes()); } @@ -884,4 +885,8 @@ public void testWithTempIndexServiceHandlesExistingIndex() throws Exception { private Set resolvedExpressions(String... expressions) { return Arrays.stream(expressions).map(ResolvedExpression::new).collect(Collectors.toSet()); } + + private IndexRemovalReason randomReason() { + return randomFrom(IndexRemovalReason.values()); + } } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java index 66b9e69aea906..9ef888da81596 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java @@ -159,7 +159,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; -import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED; +import static org.elasticsearch.indices.cluster.IndexRemovalReason.DELETED; import static org.elasticsearch.search.SearchService.DEFAULT_SIZE; import static org.elasticsearch.search.SearchService.QUERY_PHASE_PARALLEL_COLLECTION_ENABLED; import static org.elasticsearch.search.SearchService.SEARCH_WORKER_THREADS_ENABLED; diff --git a/test/framework/src/main/java/org/elasticsearch/test/MockIndexEventListener.java b/test/framework/src/main/java/org/elasticsearch/test/MockIndexEventListener.java index ba5ffa49c33a0..10a31cbf3fae0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/MockIndexEventListener.java +++ b/test/framework/src/main/java/org/elasticsearch/test/MockIndexEventListener.java @@ -21,7 +21,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.plugins.Plugin; import java.util.Arrays; diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java index 4f9ac3eb99348..64d4c8d4dc511 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java @@ -27,7 +27,8 @@ public List> getSettings() { SharedBlobCacheService.SHARED_CACHE_DECAY_INTERVAL_SETTING, SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING, SharedBlobCacheService.SHARED_CACHE_MMAP, - SharedBlobCacheService.SHARED_CACHE_COUNT_READS + SharedBlobCacheService.SHARED_CACHE_COUNT_READS, + SharedBlobCacheService.SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING ); } } diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 05b39f32bfc7a..6e9cc114fa16a 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.RelativeByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Nullable; @@ -97,6 +98,13 @@ public class SharedBlobCacheService implements Releasable { Setting.Property.NodeScope ); + public static final Setting SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING = Setting.intSetting( + SHARED_CACHE_SETTINGS_PREFIX + "concurrent_evictions", + 5, + 1, + Setting.Property.NodeScope + ); + private static Setting.Validator getPageSizeAlignedByteSizeValueValidator(String settingName) { return value -> { if (value.getBytes() == -1) { @@ -283,6 +291,8 @@ private interface Cache extends Releasable { CacheEntry get(K cacheKey, long fileLength, int region); int forceEvict(Predicate cacheKeyPredicate); + + void forceEvictAsync(Predicate cacheKey); } private abstract static class CacheEntry { @@ -328,6 +338,7 @@ private CacheEntry(T chunk) { private final Runnable evictIncrementer; private final LongSupplier relativeTimeInNanosSupplier; + private final ThrottledTaskRunner asyncEvictionsRunner; public SharedBlobCacheService( NodeEnvironment environment, @@ -388,6 +399,11 @@ public SharedBlobCacheService( this.blobCacheMetrics = blobCacheMetrics; this.evictIncrementer = blobCacheMetrics.getEvictedCountNonZeroFrequency()::increment; this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier; + this.asyncEvictionsRunner = new ThrottledTaskRunner( + "shared_blob_cache_evictions", + SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING.get(settings), + threadPool.generic() + ); } public static long calculateCacheSize(Settings settings, long totalFsSize) { @@ -709,6 +725,15 @@ public int forceEvict(Predicate cacheKeyPredicate) { } + /** + * Evict entries from the cache that match the given predicate asynchronously + * + * @param cacheKeyPredicate + */ + public void forceEvictAsync(Predicate cacheKeyPredicate) { + cache.forceEvictAsync(cacheKeyPredicate); + } + // used by tests int getFreq(CacheFileRegion cacheFileRegion) { if (cache instanceof LFUCache lfuCache) { @@ -1611,6 +1636,26 @@ public int forceEvict(Predicate cacheKeyPredicate) { return evictedCount; } + @Override + public void forceEvictAsync(Predicate cacheKeyPredicate) { + asyncEvictionsRunner.enqueueTask(new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + try (releasable) { + forceEvict(cacheKeyPredicate); + } + } + + @Override + public void onFailure(Exception e) { + // should be impossible, GENERIC pool doesn't reject anything + final String message = "unexpected failure evicting from shared blob cache"; + logger.error(message, e); + assert false : new AssertionError(message, e); + } + }); + } + private LFUCacheEntry initChunk(LFUCacheEntry entry) { assert Thread.holdsLock(entry.chunk); RegionKey regionKey = entry.chunk.regionKey; diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java index 8364cb3078466..04658606ce132 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java @@ -272,6 +272,44 @@ public void testForceEvictResponse() throws IOException { } } + public void testAsynchronousEviction() throws Exception { + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) + .put("path.home", createTempDir()) + .build(); + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + taskQueue.getThreadPool().executor(ThreadPool.Names.GENERIC), + BlobCacheMetrics.NOOP + ) + ) { + final var cacheKey1 = generateCacheKey(); + final var cacheKey2 = generateCacheKey(); + assertEquals(5, cacheService.freeRegionCount()); + final var region0 = cacheService.get(cacheKey1, size(250), 0); + assertEquals(4, cacheService.freeRegionCount()); + final var region1 = cacheService.get(cacheKey2, size(250), 1); + assertEquals(3, cacheService.freeRegionCount()); + assertFalse(region0.isEvicted()); + assertFalse(region1.isEvicted()); + cacheService.forceEvictAsync(ck -> ck == cacheKey1); + assertFalse(region0.isEvicted()); + assertFalse(region1.isEvicted()); + // run the async task + taskQueue.runAllRunnableTasks(); + assertTrue(region0.isEvicted()); + assertFalse(region1.isEvicted()); + assertEquals(4, cacheService.freeRegionCount()); + } + } + public void testDecay() throws IOException { // we have 8 regions Settings settings = Settings.builder() diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/SharedCacheEvictionTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/SharedCacheEvictionTests.java new file mode 100644 index 0000000000000..b0f386bd78a2f --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/SharedCacheEvictionTests.java @@ -0,0 +1,236 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache.shared; + +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.blobcache.BlobCacheMetrics; +import org.elasticsearch.blobcache.shared.SharedBlobCacheService; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardStateMetadata; +import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SystemIndexPlugin; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; +import org.elasticsearch.xpack.searchablesnapshots.BaseFrozenSearchableSnapshotsIntegTestCase; +import org.elasticsearch.xpack.searchablesnapshots.LocalStateSearchableSnapshots; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; +import org.elasticsearch.xpack.searchablesnapshots.cache.common.CacheKey; +import org.junit.Before; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.contains; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +public class SharedCacheEvictionTests extends BaseFrozenSearchableSnapshotsIntegTestCase { + + private static final Map> sharedBlobCacheServices = new ConcurrentHashMap<>(); + + @Override + protected Collection> nodePlugins() { + Collection> classes = super.nodePlugins(); + return Stream.concat( + classes.stream().filter(plugin -> plugin != LocalStateSearchableSnapshots.class), + Stream.of(SpyableSharedCacheSearchableSnapshots.class) + ).toList(); + } + + @SuppressWarnings("unchecked") + @Before + public void clearEvictionStats() { + sharedBlobCacheServices.values().forEach(Mockito::clearInvocations); + } + + public void testPartialShardsAreEvictedAsynchronouslyOnDelete() throws Exception { + final String mountedSnapshotName = "mounted_" + randomIdentifier(); + snapshotAndMount(mountedSnapshotName, MountSearchableSnapshotRequest.Storage.SHARED_CACHE); + + final Map allocations = getShardCounts(mountedSnapshotName); + + assertAcked(indicesAdmin().prepareDelete(mountedSnapshotName)); + allocations.forEach((nodeId, shardCount) -> { + SharedBlobCacheService sharedBlobCacheService = sharedBlobCacheServices.get(nodeId); + verify(sharedBlobCacheService, Mockito.atLeast(shardCount)).forceEvictAsync(ArgumentMatchers.any()); + verify(sharedBlobCacheService, never()).forceEvict(ArgumentMatchers.any()); + }); + } + + /** + * Fully mounted snapshots don't use the shared blob cache, so we don't need to evict them from it + */ + public void testFullFullShardsAreNotEvictedOnDelete() throws Exception { + final String mountedSnapshotName = "mounted_" + randomIdentifier(); + snapshotAndMount(mountedSnapshotName, MountSearchableSnapshotRequest.Storage.FULL_COPY); + + final Map allocations = getShardCounts(mountedSnapshotName); + + assertAcked(indicesAdmin().prepareDelete(mountedSnapshotName)); + allocations.forEach((nodeId, shardCount) -> { + SharedBlobCacheService sharedBlobCacheService = sharedBlobCacheServices.get(nodeId); + verify(sharedBlobCacheService, never()).forceEvictAsync(ArgumentMatchers.any()); + verify(sharedBlobCacheService, never()).forceEvict(ArgumentMatchers.any()); + }); + } + + /** + * We let relocated shards age out of the cache, rather than evicting them + */ + public void testPartialShardsAreNotEvictedOnRelocate() throws Exception { + final String mountedSnapshotName = "mounted_" + randomIdentifier(); + snapshotAndMount(mountedSnapshotName, MountSearchableSnapshotRequest.Storage.SHARED_CACHE); + + final Map allocations = getShardCounts(mountedSnapshotName); + + // Create another node to relocate to if we need to + if (internalCluster().numDataNodes() == 1) { + internalCluster().startNode(); + ensureStableCluster(2); + } + + final String nodeToVacateId = randomFrom(allocations.keySet()); + updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._id", nodeToVacateId)); + try { + waitForRelocation(ClusterHealthStatus.GREEN); + assertThat(getShardCounts(mountedSnapshotName).keySet(), not(contains(nodeToVacateId))); + + final SharedBlobCacheService sharedBlobCacheService = sharedBlobCacheServices.get(nodeToVacateId); + verify(sharedBlobCacheService, never()).forceEvictAsync(ArgumentMatchers.any()); + verify(sharedBlobCacheService, never()).forceEvict(ArgumentMatchers.any()); + } finally { + updateClusterSettings(Settings.builder().putNull("cluster.routing.allocation.exclude._id")); + } + } + + public void testPartialShardsAreEvictedSynchronouslyOnFailure() throws Exception { + final String mountedSnapshotName = "mounted_" + randomIdentifier(); + + snapshotAndMount(mountedSnapshotName, MountSearchableSnapshotRequest.Storage.SHARED_CACHE); + + final IndicesStatsResponse indicesStatsResponse = indicesAdmin().prepareStats(mountedSnapshotName).get(); + final Set allShards = Arrays.stream(indicesStatsResponse.getIndex(mountedSnapshotName).getShards()) + .map(sh -> sh.getShardRouting().shardId()) + .collect(Collectors.toSet()); + final String indexUUID = indicesStatsResponse.getIndex(mountedSnapshotName).getUuid(); + + // Add a node to the cluster, we'll force relocation to it + final String targetNode = internalCluster().startNode(); + + // Put some conflicting shard state in the new node's shard paths to trigger a failure + final NodeEnvironment nodeEnvironment = internalCluster().getInstance(NodeEnvironment.class, targetNode); + for (ShardId shardId : allShards) { + for (Path p : nodeEnvironment.availableShardPaths(shardId)) { + ShardStateMetadata.FORMAT.write( + new ShardStateMetadata(true, randomValueOtherThan(indexUUID, ESTestCase::randomIdentifier), null), + p + ); + } + } + + // Force relocation, it should fail + updateClusterSettings(Settings.builder().put("cluster.routing.allocation.require._name", targetNode)); + + try { + waitForRelocation(); + + final SharedBlobCacheService sharedBlobCacheService = sharedBlobCacheServices.get(getNodeId(targetNode)); + verify(sharedBlobCacheService, never()).forceEvictAsync(ArgumentMatchers.any()); + verify(sharedBlobCacheService, Mockito.atLeast(allShards.size())).forceEvict(ArgumentMatchers.any()); + } finally { + updateClusterSettings(Settings.builder().putNull("cluster.routing.allocation.require._name")); + } + } + + private Map getShardCounts(String indexName) { + final IndicesStatsResponse indicesStatsResponse = indicesAdmin().prepareStats(indexName).get(); + final Map allocations = new HashMap<>(); + Arrays.stream(indicesStatsResponse.getShards()) + .forEach(shardStats -> allocations.compute(shardStats.getShardRouting().currentNodeId(), (s, v) -> v == null ? 1 : v + 1)); + assertThat(allocations, not(anEmptyMap())); + return allocations; + } + + private void snapshotAndMount(String mountedSnapshotName, MountSearchableSnapshotRequest.Storage storage) throws Exception { + final String repositoryName = randomIdentifier(); + final String indexName = randomValueOtherThan(mountedSnapshotName, ESTestCase::randomIdentifier); + final String snapshotName = randomIdentifier(); + + createRepository(repositoryName, "fs"); + createIndexWithRandomDocs(indexName, randomIntBetween(10, 300)); + createSnapshot(repositoryName, snapshotName, List.of(indexName)); + mountSnapshot(repositoryName, snapshotName, indexName, mountedSnapshotName, Settings.EMPTY, storage); + ensureGreen(mountedSnapshotName); + } + + public static class SpyableSharedCacheSearchableSnapshots extends LocalStateCompositeXPackPlugin implements SystemIndexPlugin { + + private final SearchableSnapshots plugin; + + public SpyableSharedCacheSearchableSnapshots(final Settings settings, final Path configPath) { + super(settings, configPath); + this.plugin = new SearchableSnapshots(settings) { + + @Override + protected XPackLicenseState getLicenseState() { + return SpyableSharedCacheSearchableSnapshots.this.getLicenseState(); + } + + @Override + protected SharedBlobCacheService createSharedBlobCacheService( + Settings settings, + ThreadPool threadPool, + NodeEnvironment nodeEnvironment, + BlobCacheMetrics blobCacheMetrics + ) { + final SharedBlobCacheService spy = Mockito.spy( + super.createSharedBlobCacheService(settings, threadPool, nodeEnvironment, blobCacheMetrics) + ); + sharedBlobCacheServices.put(nodeEnvironment.nodeId(), spy); + return spy; + } + }; + plugins.add(plugin); + } + + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + return plugin.getSystemIndexDescriptors(settings); + } + + @Override + public String getFeatureName() { + return plugin.getFeatureName(); + } + + @Override + public String getFeatureDescription() { + return plugin.getFeatureDescription(); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 69c1530453f7f..cb4d0f5eeda37 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -323,12 +323,12 @@ public Collection createComponents(PluginServices services) { if (DiscoveryNode.canContainData(settings)) { final CacheService cacheService = new CacheService(settings, clusterService, threadPool, new PersistentCache(nodeEnvironment)); this.cacheService.set(cacheService); - final SharedBlobCacheService sharedBlobCacheService = new SharedBlobCacheService<>( - nodeEnvironment, + final BlobCacheMetrics blobCacheMetrics = new BlobCacheMetrics(services.telemetryProvider().getMeterRegistry()); + final SharedBlobCacheService sharedBlobCacheService = createSharedBlobCacheService( settings, threadPool, - threadPool.executor(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME), - new BlobCacheMetrics(services.telemetryProvider().getMeterRegistry()) + nodeEnvironment, + blobCacheMetrics ); this.frozenCacheService.set(sharedBlobCacheService); components.add(cacheService); @@ -367,6 +367,22 @@ public Collection createComponents(PluginServices services) { return Collections.unmodifiableList(components); } + // overridable for testing + protected SharedBlobCacheService createSharedBlobCacheService( + final Settings settings, + final ThreadPool threadPool, + final NodeEnvironment nodeEnvironment, + final BlobCacheMetrics blobCacheMetrics + ) { + return new SharedBlobCacheService<>( + nodeEnvironment, + settings, + threadPool, + threadPool.executor(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME), + blobCacheMetrics + ); + } + @Override public void onIndexModule(IndexModule indexModule) { if (indexModule.indexSettings().getIndexMetadata().isSearchableSnapshot()) { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexEventListener.java index 4caf932a99807..3ae1cc1444a2f 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexEventListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexEventListener.java @@ -22,7 +22,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.ByteSizeCachingDirectory; -import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; import org.elasticsearch.xpack.searchablesnapshots.cache.common.CacheKey; @@ -113,8 +113,18 @@ public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason rea shardId ); } - if (sharedBlobCacheService != null) { - sharedBlobCacheService.forceEvict(SearchableSnapshots.forceEvictPredicate(shardId, indexSettings.getSettings())); + if (indexSettings.getIndexMetadata().isPartialSearchableSnapshot() && sharedBlobCacheService != null) { + switch (reason) { + // This index was deleted, it's not coming back - we can evict asynchronously + case DELETED -> sharedBlobCacheService.forceEvictAsync( + SearchableSnapshots.forceEvictPredicate(shardId, indexSettings.getSettings()) + ); + // A failure occurred - we should eagerly clear the state + case FAILURE -> sharedBlobCacheService.forceEvict( + SearchableSnapshots.forceEvictPredicate(shardId, indexSettings.getSettings()) + ); + // Any other reason - we let the cache entries expire naturally + } } } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexFoldersDeletionListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexFoldersDeletionListener.java index 4d7174e0f7ff4..e6891ff859248 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexFoldersDeletionListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotIndexFoldersDeletionListener.java @@ -13,6 +13,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.cluster.IndexRemovalReason; import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; import org.elasticsearch.xpack.searchablesnapshots.cache.common.CacheKey; @@ -46,22 +47,32 @@ public SearchableSnapshotIndexFoldersDeletionListener( } @Override - public void beforeIndexFoldersDeleted(Index index, IndexSettings indexSettings, Path[] indexPaths) { + public void beforeIndexFoldersDeleted( + Index index, + IndexSettings indexSettings, + Path[] indexPaths, + IndexRemovalReason indexRemovalReason + ) { if (indexSettings.getIndexMetadata().isSearchableSnapshot()) { for (int shard = 0; shard < indexSettings.getNumberOfShards(); shard++) { - markShardAsEvictedInCache(new ShardId(index, shard), indexSettings); + markShardAsEvictedInCache(new ShardId(index, shard), indexSettings, indexRemovalReason); } } } @Override - public void beforeShardFoldersDeleted(ShardId shardId, IndexSettings indexSettings, Path[] shardPaths) { + public void beforeShardFoldersDeleted( + ShardId shardId, + IndexSettings indexSettings, + Path[] shardPaths, + IndexRemovalReason indexRemovalReason + ) { if (indexSettings.getIndexMetadata().isSearchableSnapshot()) { - markShardAsEvictedInCache(shardId, indexSettings); + markShardAsEvictedInCache(shardId, indexSettings, indexRemovalReason); } } - private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSettings) { + private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSettings, IndexRemovalReason indexRemovalReason) { final CacheService cacheService = this.cacheServiceSupplier.get(); assert cacheService != null : "cache service not initialized"; @@ -72,8 +83,25 @@ private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSetti shardId ); - final SharedBlobCacheService sharedBlobCacheService = this.frozenCacheServiceSupplier.get(); - assert sharedBlobCacheService != null : "frozen cache service not initialized"; - sharedBlobCacheService.forceEvict(SearchableSnapshots.forceEvictPredicate(shardId, indexSettings.getSettings())); + // Only partial searchable snapshots use the shared blob cache. + if (indexSettings.getIndexMetadata().isPartialSearchableSnapshot()) { + switch (indexRemovalReason) { + // The index was deleted, it's not coming back - we can evict asynchronously + case DELETED -> { + final SharedBlobCacheService sharedBlobCacheService = + SearchableSnapshotIndexFoldersDeletionListener.this.frozenCacheServiceSupplier.get(); + assert sharedBlobCacheService != null : "frozen cache service not initialized"; + sharedBlobCacheService.forceEvictAsync(SearchableSnapshots.forceEvictPredicate(shardId, indexSettings.getSettings())); + } + // An error occurred - we should eagerly clear the state + case FAILURE -> { + final SharedBlobCacheService sharedBlobCacheService = + SearchableSnapshotIndexFoldersDeletionListener.this.frozenCacheServiceSupplier.get(); + assert sharedBlobCacheService != null : "frozen cache service not initialized"; + sharedBlobCacheService.forceEvict(SearchableSnapshots.forceEvictPredicate(shardId, indexSettings.getSettings())); + } + // Any other reason - we let the cache entries expire naturally + } + } } }