Skip to content

Commit 43df7d7

Browse files
authored
Ignore closed indices for reindex (#120244) (#120562)
1 parent 00dbf11 commit 43df7d7

File tree

5 files changed

+224
-50
lines changed

5 files changed

+224
-50
lines changed

docs/changelog/120244.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120244
2+
summary: Ignore closed indices for reindex
3+
area: Data streams
4+
type: enhancement
5+
issues: []

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public static Predicate<Index> getReindexRequiredPredicate(Metadata metadata) {
3636
public static boolean reindexRequired(IndexMetadata indexMetadata) {
3737
return creationVersionBeforeMinimumWritableVersion(indexMetadata)
3838
&& isNotSearchableSnapshot(indexMetadata)
39+
&& isNotClosed(indexMetadata)
3940
&& isNotVerifiedReadOnly(indexMetadata);
4041
}
4142

@@ -52,4 +53,8 @@ private static boolean creationVersionBeforeMinimumWritableVersion(IndexMetadata
5253
return metadata.getCreationVersion().before(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE);
5354
}
5455

56+
private static boolean isNotClosed(IndexMetadata indexMetadata) {
57+
return indexMetadata.getState().equals(IndexMetadata.State.CLOSE) == false;
58+
}
59+
5560
}

x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecksTests.java

Lines changed: 153 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -41,56 +41,79 @@ public void testOldIndicesCheck() {
4141
int oldIndexCount = randomIntBetween(1, 100);
4242
int newIndexCount = randomIntBetween(1, 100);
4343

44-
List<Index> allIndices = new ArrayList<>();
4544
Map<String, IndexMetadata> nameToIndexMetadata = new HashMap<>();
4645
Set<String> expectedIndices = new HashSet<>();
4746

48-
for (int i = 0; i < oldIndexCount; i++) {
49-
Settings.Builder settings = settings(IndexVersion.fromId(7170099));
47+
DataStream dataStream = createTestDataStream(oldIndexCount, 0, newIndexCount, 0, nameToIndexMetadata, expectedIndices);
5048

51-
String indexName = "old-data-stream-index-" + i;
52-
if (expectedIndices.isEmpty() == false && randomIntBetween(0, 2) == 0) {
53-
settings.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE);
54-
} else {
55-
expectedIndices.add(indexName);
56-
}
49+
Metadata metadata = Metadata.builder().indices(nameToIndexMetadata).build();
50+
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build();
5751

58-
Settings.Builder settingsBuilder = settings;
59-
IndexMetadata oldIndexMetadata = IndexMetadata.builder(indexName)
60-
.settings(settingsBuilder)
61-
.numberOfShards(1)
62-
.numberOfReplicas(0)
63-
.build();
64-
allIndices.add(oldIndexMetadata.getIndex());
65-
nameToIndexMetadata.put(oldIndexMetadata.getIndex().getName(), oldIndexMetadata);
66-
}
52+
DeprecationIssue expected = new DeprecationIssue(
53+
DeprecationIssue.Level.CRITICAL,
54+
"Old data stream with a compatibility version < 8.0",
55+
"https://www.elastic.co/guide/en/elasticsearch/reference/current/migrating-8.0.html#breaking-changes-8.0",
56+
"This data stream has backing indices that were created before Elasticsearch 8.0.0",
57+
false,
58+
ofEntries(
59+
entry("reindex_required", true),
60+
entry("total_backing_indices", oldIndexCount + newIndexCount),
61+
entry("indices_requiring_upgrade_count", expectedIndices.size()),
62+
entry("indices_requiring_upgrade", expectedIndices)
63+
)
64+
);
6765

68-
for (int i = 0; i < newIndexCount; i++) {
69-
Settings.Builder settingsBuilder = settings(IndexVersion.current());
70-
IndexMetadata newIndexMetadata = IndexMetadata.builder("new-data-stream-index-" + i)
71-
.settings(settingsBuilder)
72-
.numberOfShards(1)
73-
.numberOfReplicas(0)
74-
.build();
75-
allIndices.add(newIndexMetadata.getIndex());
76-
nameToIndexMetadata.put(newIndexMetadata.getIndex().getName(), newIndexMetadata);
77-
}
66+
List<DeprecationIssue> issues = DeprecationChecks.filterChecks(DATA_STREAM_CHECKS, c -> c.apply(dataStream, clusterState));
7867

79-
DataStream dataStream = new DataStream(
80-
randomAlphaOfLength(10),
81-
allIndices,
82-
randomNonNegativeLong(),
83-
Map.of(),
84-
randomBoolean(),
85-
false,
86-
false,
87-
randomBoolean(),
88-
randomFrom(IndexMode.values()),
89-
null,
90-
randomFrom(DataStreamOptions.EMPTY, DataStreamOptions.FAILURE_STORE_DISABLED, DataStreamOptions.FAILURE_STORE_ENABLED, null),
91-
List.of(),
92-
randomBoolean(),
93-
null
68+
assertThat(issues, equalTo(singletonList(expected)));
69+
}
70+
71+
public void testOldIndicesCheckWithOnlyClosedOrNewIndices() {
72+
// This tests what happens when any old indices that we have are closed. We expect no deprecation warning.
73+
int oldClosedIndexCount = randomIntBetween(1, 100);
74+
int newOpenIndexCount = randomIntBetween(0, 100);
75+
int newClosedIndexCount = randomIntBetween(0, 100);
76+
77+
Map<String, IndexMetadata> nameToIndexMetadata = new HashMap<>();
78+
Set<String> expectedIndices = new HashSet<>();
79+
80+
DataStream dataStream = createTestDataStream(
81+
0,
82+
oldClosedIndexCount,
83+
newOpenIndexCount,
84+
newClosedIndexCount,
85+
nameToIndexMetadata,
86+
expectedIndices
87+
);
88+
89+
Metadata metadata = Metadata.builder().indices(nameToIndexMetadata).build();
90+
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build();
91+
92+
List<DeprecationIssue> issues = DeprecationChecks.filterChecks(DATA_STREAM_CHECKS, c -> c.apply(dataStream, clusterState));
93+
94+
assertThat(issues.size(), equalTo(0));
95+
}
96+
97+
public void testOldIndicesCheckWithClosedAndOpenIndices() {
98+
/*
99+
* This tests what happens when a data stream has old indices, and some are open and some are closed. We expect a deprecation
100+
* warning that includes information about the old ones only.
101+
*/
102+
int oldOpenIndexCount = randomIntBetween(1, 100);
103+
int oldClosedIndexCount = randomIntBetween(1, 100);
104+
int newOpenIndexCount = randomIntBetween(0, 100);
105+
int newClosedIndexCount = randomIntBetween(0, 100);
106+
107+
Map<String, IndexMetadata> nameToIndexMetadata = new HashMap<>();
108+
Set<String> expectedIndices = new HashSet<>();
109+
110+
DataStream dataStream = createTestDataStream(
111+
oldOpenIndexCount,
112+
oldClosedIndexCount,
113+
newOpenIndexCount,
114+
newClosedIndexCount,
115+
nameToIndexMetadata,
116+
expectedIndices
94117
);
95118

96119
Metadata metadata = Metadata.builder().indices(nameToIndexMetadata).build();
@@ -104,7 +127,7 @@ public void testOldIndicesCheck() {
104127
false,
105128
ofEntries(
106129
entry("reindex_required", true),
107-
entry("total_backing_indices", oldIndexCount + newIndexCount),
130+
entry("total_backing_indices", oldOpenIndexCount + oldClosedIndexCount + newOpenIndexCount + newClosedIndexCount),
108131
entry("indices_requiring_upgrade_count", expectedIndices.size()),
109132
entry("indices_requiring_upgrade", expectedIndices)
110133
)
@@ -115,4 +138,90 @@ public void testOldIndicesCheck() {
115138
assertThat(issues, equalTo(singletonList(expected)));
116139
}
117140

141+
/*
142+
* This creates a test DataStream with the given counts. The nameToIndexMetadata Map and the expectedIndices Set are mutable collections
143+
* that will be populated by this method.
144+
*/
145+
private DataStream createTestDataStream(
146+
int oldOpenIndexCount,
147+
int oldClosedIndexCount,
148+
int newOpenIndexCount,
149+
int newClosedIndexCount,
150+
Map<String, IndexMetadata> nameToIndexMetadata,
151+
Set<String> expectedIndices
152+
) {
153+
List<Index> allIndices = new ArrayList<>();
154+
155+
for (int i = 0; i < oldOpenIndexCount; i++) {
156+
allIndices.add(createOldIndex(i, false, nameToIndexMetadata, expectedIndices));
157+
}
158+
for (int i = 0; i < oldClosedIndexCount; i++) {
159+
allIndices.add(createOldIndex(i, true, nameToIndexMetadata, null));
160+
}
161+
for (int i = 0; i < newOpenIndexCount; i++) {
162+
allIndices.add(createNewIndex(i, false, nameToIndexMetadata));
163+
}
164+
for (int i = 0; i < newClosedIndexCount; i++) {
165+
allIndices.add(createNewIndex(i, true, nameToIndexMetadata));
166+
}
167+
168+
DataStream dataStream = new DataStream(
169+
randomAlphaOfLength(10),
170+
allIndices,
171+
randomNonNegativeLong(),
172+
Map.of(),
173+
randomBoolean(),
174+
false,
175+
false,
176+
randomBoolean(),
177+
randomFrom(IndexMode.values()),
178+
null,
179+
randomFrom(DataStreamOptions.EMPTY, DataStreamOptions.FAILURE_STORE_DISABLED, DataStreamOptions.FAILURE_STORE_ENABLED, null),
180+
List.of(),
181+
randomBoolean(),
182+
null
183+
);
184+
return dataStream;
185+
}
186+
187+
private Index createOldIndex(
188+
int suffix,
189+
boolean isClosed,
190+
Map<String, IndexMetadata> nameToIndexMetadata,
191+
Set<String> expectedIndices
192+
) {
193+
return createIndex(true, suffix, isClosed, nameToIndexMetadata, expectedIndices);
194+
}
195+
196+
private Index createNewIndex(int suffix, boolean isClosed, Map<String, IndexMetadata> nameToIndexMetadata) {
197+
return createIndex(false, suffix, isClosed, nameToIndexMetadata, null);
198+
}
199+
200+
private Index createIndex(
201+
boolean isOld,
202+
int suffix,
203+
boolean isClosed,
204+
Map<String, IndexMetadata> nameToIndexMetadata,
205+
Set<String> expectedIndices
206+
) {
207+
Settings.Builder settingsBuilder = isOld ? settings(IndexVersion.fromId(7170099)) : settings(IndexVersion.current());
208+
String indexName = (isOld ? "old-" : "new-") + (isClosed ? "closed-" : "") + "data-stream-index-" + suffix;
209+
if (isOld && isClosed == false) { // we only expect warnings on open old indices
210+
if (expectedIndices.isEmpty() == false && randomIntBetween(0, 2) == 0) {
211+
settingsBuilder.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE);
212+
} else {
213+
expectedIndices.add(indexName);
214+
}
215+
}
216+
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
217+
.settings(settingsBuilder)
218+
.numberOfShards(1)
219+
.numberOfReplicas(0);
220+
if (isClosed) {
221+
indexMetadataBuilder.state(IndexMetadata.State.CLOSE);
222+
}
223+
IndexMetadata indexMetadata = indexMetadataBuilder.build();
224+
nameToIndexMetadata.put(indexMetadata.getIndex().getName(), indexMetadata);
225+
return indexMetadata.getIndex();
226+
}
118227
}

x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecksTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,22 @@ public void testOldIndicesCheckSnapshotIgnored() {
116116
assertThat(issues, empty());
117117
}
118118

119+
public void testOldIndicesCheckClosedIgnored() {
120+
IndexVersion createdWith = IndexVersion.fromId(7170099);
121+
Settings.Builder settings = settings(createdWith);
122+
IndexMetadata indexMetadata = IndexMetadata.builder("test")
123+
.settings(settings)
124+
.numberOfShards(1)
125+
.numberOfReplicas(0)
126+
.state(IndexMetadata.State.CLOSE)
127+
.build();
128+
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
129+
.metadata(Metadata.builder().put(indexMetadata, true))
130+
.build();
131+
List<DeprecationIssue> issues = DeprecationChecks.filterChecks(INDEX_SETTINGS_CHECKS, c -> c.apply(indexMetadata, clusterState));
132+
assertThat(issues, empty());
133+
}
134+
119135
public void testTranslogRetentionSettings() {
120136
Settings.Builder settings = settings(IndexVersion.current());
121137
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue());

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.IOException;
2525
import java.nio.charset.StandardCharsets;
2626
import java.time.Instant;
27+
import java.util.HashSet;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.Set;
@@ -247,16 +248,27 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
247248
assertOK(client().performRequest(putIndexTemplateRequest));
248249
bulkLoadData(dataStreamName);
249250
for (int i = 0; i < numRollovers; i++) {
250-
rollover(dataStreamName);
251+
String oldIndexName = rollover(dataStreamName);
252+
if (randomBoolean()) {
253+
closeIndex(oldIndexName);
254+
}
251255
bulkLoadData(dataStreamName);
252256
}
253257
}
254258

255259
private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception {
256260
Set<String> indicesNeedingUpgrade = getDataStreamIndices(dataStreamName);
261+
Set<String> closedOldIndices = getClosedIndices(dataStreamName);
257262
final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2);
258263
for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) {
259-
rollover(dataStreamName);
264+
String oldIndexName = rollover(dataStreamName);
265+
if (randomBoolean()) {
266+
if (i == 0) {
267+
// Since this is the first rollover on the new cluster, the old index came from the old cluster
268+
closedOldIndices.add(oldIndexName);
269+
}
270+
closeIndex(oldIndexName);
271+
}
260272
}
261273
Request reindexRequest = new Request("POST", "/_migration/reindex");
262274
reindexRequest.setJsonEntity(Strings.format("""
@@ -299,12 +311,14 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
299311
*/
300312
assertThat(
301313
statusResponseMap.get("total_indices_requiring_upgrade"),
302-
equalTo(originalWriteIndex + numRolloversOnOldCluster)
314+
equalTo(originalWriteIndex + numRolloversOnOldCluster - closedOldIndices.size())
303315
);
304-
assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1));
316+
assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1 - closedOldIndices.size()));
305317
// We expect all the original indices to have been deleted
306318
for (String oldIndex : indicesNeedingUpgrade) {
307-
assertThat(indexExists(oldIndex), equalTo(false));
319+
if (closedOldIndices.contains(oldIndex) == false) {
320+
assertThat(indexExists(oldIndex), equalTo(false));
321+
}
308322
}
309323
assertThat(getDataStreamIndices(dataStreamName).size(), equalTo(expectedTotalIndicesInDataStream));
310324
}
@@ -324,6 +338,29 @@ private Set<String> getDataStreamIndices(String dataStreamName) throws IOExcepti
324338
return indices.stream().map(index -> index.get("index_name").toString()).collect(Collectors.toSet());
325339
}
326340

341+
@SuppressWarnings("unchecked")
342+
private Set<String> getClosedIndices(String dataStreamName) throws IOException {
343+
Set<String> allIndices = getDataStreamIndices(dataStreamName);
344+
Set<String> closedIndices = new HashSet<>();
345+
Response response = client().performRequest(new Request("GET", "_cluster/state/blocks/indices"));
346+
Map<String, Object> responseMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false);
347+
Map<String, Object> blocks = (Map<String, Object>) responseMap.get("blocks");
348+
Map<String, Object> indices = (Map<String, Object>) blocks.get("indices");
349+
for (Map.Entry<String, Object> indexEntry : indices.entrySet()) {
350+
String indexName = indexEntry.getKey();
351+
if (allIndices.contains(indexName)) {
352+
Map<String, Object> blocksForIndex = (Map<String, Object>) indexEntry.getValue();
353+
for (Map.Entry<String, Object> blockEntry : blocksForIndex.entrySet()) {
354+
Map<String, String> block = (Map<String, String>) blockEntry.getValue();
355+
if ("index closed".equals(block.get("description"))) {
356+
closedIndices.add(indexName);
357+
}
358+
}
359+
}
360+
}
361+
return closedIndices;
362+
}
363+
327364
/*
328365
* Similar to isOriginalClusterCurrent, but returns true if the major versions of the clusters are the same. So true
329366
* for 8.6 and 8.17, but false for 7.17 and 8.18.
@@ -365,9 +402,11 @@ static String formatInstant(Instant instant) {
365402
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
366403
}
367404

368-
private static void rollover(String dataStreamName) throws IOException {
405+
private static String rollover(String dataStreamName) throws IOException {
369406
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
370407
Response rolloverResponse = client().performRequest(rolloverRequest);
371408
assertOK(rolloverResponse);
409+
String oldIndexName = (String) entityAsMap(rolloverResponse).get("old_index");
410+
return oldIndexName;
372411
}
373412
}

0 commit comments

Comments
 (0)