Skip to content

Commit 0666462

Browse files
authored
Use the system index descriptor in the snapshot blob cache cleanup task (#120937)
Clean up of the `.snapshot-blob-cache*` system index is done only on the node that hosts the primary of the shard 0 of that index. When the index is migrated as part of an upgrade test e.g. v7 -> v8, the index is reindexed to a new index `.snapshot-blob-cache-reindexed-for-9`. The code scheduling this clean up task is not able to locate the shard and would never trigger a clean up after the upgrade. This change uses the system index descriptor to find the matching shard and would work for future versions too. Closes #120518
1 parent 1fa1ba7 commit 0666462

File tree

4 files changed

+77
-16
lines changed

4 files changed

+77
-16
lines changed

docs/changelog/120937.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 120937
2+
summary: Use the system index descriptor in the snapshot blob cache cleanup task
3+
area: Snapshot/Restore
4+
type: bug
5+
issues:
6+
- 120518

x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import org.elasticsearch.core.TimeValue;
2424
import org.elasticsearch.core.Tuple;
2525
import org.elasticsearch.index.IndexNotFoundException;
26+
import org.elasticsearch.index.reindex.ReindexAction;
27+
import org.elasticsearch.index.reindex.ReindexRequest;
2628
import org.elasticsearch.index.shard.ShardId;
2729
import org.elasticsearch.index.store.LuceneFilesExtensions;
30+
import org.elasticsearch.indices.SystemIndices;
2831
import org.elasticsearch.plugins.Plugin;
2932
import org.elasticsearch.reindex.ReindexPlugin;
3033
import org.elasticsearch.repositories.IndexId;
@@ -63,9 +66,11 @@
6366
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
6467
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
6568
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
69+
import static org.hamcrest.Matchers.empty;
6670
import static org.hamcrest.Matchers.equalTo;
6771
import static org.hamcrest.Matchers.greaterThan;
6872
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
73+
import static org.hamcrest.Matchers.is;
6974

7075
public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends BaseFrozenSearchableSnapshotsIntegTestCase {
7176

@@ -194,7 +199,6 @@ public void testCleanUpAfterIndicesAreDeleted() throws Exception {
194199
}
195200
});
196201
}
197-
198202
logger.info("--> deleting indices, maintenance service should clean up snapshot blob cache index");
199203
assertAcked(indicesAdmin().prepareDelete("mounted-*"));
200204
assertBusy(() -> {
@@ -311,6 +315,46 @@ public void testPeriodicMaintenance() throws Exception {
311315
}
312316
}
313317

318+
public void testCleanUpMigratedSystemIndexAfterIndicesAreDeleted() throws Exception {
319+
final String repositoryName = "repository";
320+
createRepository(repositoryName, FsRepository.TYPE);
321+
322+
final Map<String, Tuple<Settings, Long>> mountedIndices = mountRandomIndicesWithCache(repositoryName, 3, 10);
323+
ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX);
324+
refreshSystemIndex(true);
325+
326+
final long numberOfEntriesInCache = numberOfEntriesInCache();
327+
logger.info("--> found [{}] entries in snapshot blob cache", numberOfEntriesInCache);
328+
assertThat(numberOfEntriesInCache, equalTo(mountedIndices.values().stream().mapToLong(Tuple::v2).sum()));
329+
330+
migrateTheSystemIndex();
331+
332+
logger.info("--> deleting indices, maintenance service should clean up snapshot blob cache index");
333+
assertAcked(indicesAdmin().prepareDelete("mounted-*"));
334+
assertBusy(() -> {
335+
refreshSystemIndex(true);
336+
assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0), 0L);
337+
});
338+
}
339+
340+
/**
341+
* Mimics migration of the {@link SearchableSnapshots#SNAPSHOT_BLOB_CACHE_INDEX} as done in
342+
* {@link org.elasticsearch.upgrades.SystemIndexMigrator}, where the index is re-indexed, and replaced by an alias.
343+
*/
344+
private void migrateTheSystemIndex() {
345+
final var migratedSnapshotBlobCache = SNAPSHOT_BLOB_CACHE_INDEX + SystemIndices.UPGRADED_INDEX_SUFFIX;
346+
logger.info("--> migrating {} system index to {}", SNAPSHOT_BLOB_CACHE_INDEX, migratedSnapshotBlobCache);
347+
var reindexRequest = new ReindexRequest().setSourceIndices(SNAPSHOT_BLOB_CACHE_INDEX)
348+
.setDestIndex(migratedSnapshotBlobCache)
349+
.setRefresh(true);
350+
var resp = safeGet(client().execute(ReindexAction.INSTANCE, reindexRequest));
351+
assertThat(resp.getBulkFailures(), is(empty()));
352+
indicesAdmin().prepareAliases()
353+
.removeIndex(SNAPSHOT_BLOB_CACHE_INDEX)
354+
.addAlias(migratedSnapshotBlobCache, SNAPSHOT_BLOB_CACHE_INDEX)
355+
.get();
356+
}
357+
314358
/**
315359
* @return a {@link Client} that can be used to query the blob store cache system index
316360
*/

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,14 @@ public Collection<?> createComponents(PluginServices services) {
337337
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(client, SNAPSHOT_BLOB_CACHE_INDEX);
338338
this.blobStoreCacheService.set(blobStoreCacheService);
339339
clusterService.addListener(
340-
new BlobStoreCacheMaintenanceService(settings, clusterService, threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX)
340+
new BlobStoreCacheMaintenanceService(
341+
settings,
342+
clusterService,
343+
threadPool,
344+
client,
345+
services.systemIndices(),
346+
SNAPSHOT_BLOB_CACHE_INDEX
347+
)
341348
);
342349
components.add(blobStoreCacheService);
343350
} else {

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
3737
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
3838
import org.elasticsearch.cluster.routing.IndexRoutingTable;
39-
import org.elasticsearch.cluster.routing.ShardRouting;
4039
import org.elasticsearch.cluster.service.ClusterService;
4140
import org.elasticsearch.common.bytes.BytesReference;
4241
import org.elasticsearch.common.document.DocumentField;
@@ -47,7 +46,6 @@
4746
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4847
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
4948
import org.elasticsearch.core.AbstractRefCounted;
50-
import org.elasticsearch.core.Nullable;
5149
import org.elasticsearch.core.RefCounted;
5250
import org.elasticsearch.core.TimeValue;
5351
import org.elasticsearch.core.Tuple;
@@ -57,6 +55,8 @@
5755
import org.elasticsearch.index.reindex.BulkByScrollResponse;
5856
import org.elasticsearch.index.reindex.DeleteByQueryAction;
5957
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
58+
import org.elasticsearch.indices.SystemIndexDescriptor;
59+
import org.elasticsearch.indices.SystemIndices;
6060
import org.elasticsearch.search.SearchHit;
6161
import org.elasticsearch.search.builder.PointInTimeBuilder;
6262
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -145,6 +145,7 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
145145
private final Client clientWithOrigin;
146146
private final String systemIndexName;
147147
private final ThreadPool threadPool;
148+
private final SystemIndexDescriptor systemIndexDescriptor;
148149

149150
private volatile Scheduler.Cancellable periodicTask;
150151
private volatile TimeValue periodicTaskInterval;
@@ -158,10 +159,12 @@ public BlobStoreCacheMaintenanceService(
158159
ClusterService clusterService,
159160
ThreadPool threadPool,
160161
Client client,
162+
SystemIndices systemIndices,
161163
String systemIndexName
162164
) {
163165
this.clientWithOrigin = new OriginSettingClient(Objects.requireNonNull(client), SEARCHABLE_SNAPSHOTS_ORIGIN);
164166
this.systemIndexName = Objects.requireNonNull(systemIndexName);
167+
this.systemIndexDescriptor = Objects.requireNonNull(systemIndices.findMatchingDescriptor(systemIndexName));
165168
this.clusterService = Objects.requireNonNull(clusterService);
166169
this.threadPool = Objects.requireNonNull(threadPool);
167170
this.periodicTaskInterval = SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.get(settings);
@@ -181,10 +184,7 @@ public void clusterChanged(ClusterChangedEvent event) {
181184
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
182185
return; // state not fully recovered
183186
}
184-
final ShardRouting primary = systemIndexPrimaryShard(state);
185-
if (primary == null
186-
|| primary.active() == false
187-
|| Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) {
187+
if (systemIndexPrimaryShardActiveAndAssignedToLocalNode(state) == false) {
188188
// system index primary shard does not exist or is not assigned to this data node
189189
stopPeriodicTask();
190190
return;
@@ -242,16 +242,20 @@ private synchronized void stopPeriodicTask() {
242242
}
243243
}
244244

245-
@Nullable
246-
private ShardRouting systemIndexPrimaryShard(final ClusterState state) {
247-
final IndexMetadata indexMetadata = state.metadata().index(systemIndexName);
248-
if (indexMetadata != null) {
249-
final IndexRoutingTable indexRoutingTable = state.routingTable().index(indexMetadata.getIndex());
250-
if (indexRoutingTable != null) {
251-
return indexRoutingTable.shard(0).primaryShard();
245+
private boolean systemIndexPrimaryShardActiveAndAssignedToLocalNode(final ClusterState state) {
246+
for (IndexMetadata indexMetadata : state.metadata()) {
247+
if (indexMetadata.isSystem() && systemIndexDescriptor.matchesIndexPattern(indexMetadata.getIndex().getName())) {
248+
final IndexRoutingTable indexRoutingTable = state.routingTable().index(indexMetadata.getIndex());
249+
if (indexRoutingTable == null || indexRoutingTable.shard(0) == null) {
250+
continue;
251+
}
252+
final var primary = indexRoutingTable.shard(0).primaryShard();
253+
if (primary != null && primary.active() && Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId())) {
254+
return true;
255+
}
252256
}
253257
}
254-
return null;
258+
return false;
255259
}
256260

257261
private static boolean hasSearchableSnapshotWith(final ClusterState state, final String snapshotId, final String indexId) {

0 commit comments

Comments
 (0)