Skip to content

Commit 2ee7ca4

Browse files
authored
[Failure store] Add failure index retrieval to IndexAbstraction (elastic#119413)
1 parent 4a2abab commit 2ee7ca4

File tree

61 files changed

+596
-442
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+596
-442
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1925,7 +1925,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
19251925
DataStream original = currentState.getMetadata().dataStreams().get(dataStreamName);
19261926
DataStream broken = original.copy()
19271927
.setBackingIndices(
1928-
original.getBackingIndices()
1928+
original.getDataComponent()
19291929
.copy()
19301930
.setIndices(
19311931
List.of(new Index(original.getIndices().get(0).getName(), "broken"), original.getIndices().get(1))

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public void setup() throws Exception {
148148
dsBackingIndexName = dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName();
149149
otherDsBackingIndexName = dataStreamInfos.get(1).getDataStream().getIndices().get(0).getName();
150150
fsBackingIndexName = dataStreamInfos.get(2).getDataStream().getIndices().get(0).getName();
151-
fsFailureIndexName = dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().get(0).getName();
151+
fsFailureIndexName = dataStreamInfos.get(2).getDataStream().getFailureIndices().get(0).getName();
152152

153153
// Will be used in some tests, to test renaming while restoring a snapshot:
154154
ds2BackingIndexName = dsBackingIndexName.replace("-ds-", "-ds2-");
@@ -279,7 +279,7 @@ public void testSnapshotAndRestoreAllDataStreamsInPlace() throws Exception {
279279
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(otherDsBackingIndexName));
280280
backingIndices = dataStreamInfos.get(2).getDataStream().getIndices();
281281
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(fsBackingIndexName));
282-
List<Index> failureIndices = dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices();
282+
List<Index> failureIndices = dataStreamInfos.get(2).getDataStream().getFailureIndices();
283283
assertThat(failureIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(fsFailureIndexName));
284284
}
285285

@@ -375,7 +375,7 @@ public void testFailureStoreSnapshotAndRestore() throws Exception {
375375
assertEquals(1, dataStreamInfos.size());
376376
assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size());
377377
assertEquals(fsBackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName());
378-
assertEquals(fsFailureIndexName, dataStreamInfos.get(0).getDataStream().getFailureIndices().getIndices().get(0).getName());
378+
assertEquals(fsFailureIndexName, dataStreamInfos.get(0).getDataStream().getFailureIndices().get(0).getName());
379379
}
380380
{
381381
// With rename pattern
@@ -394,7 +394,7 @@ public void testFailureStoreSnapshotAndRestore() throws Exception {
394394
assertEquals(1, dataStreamInfos.size());
395395
assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size());
396396
assertEquals(fs2BackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName());
397-
assertEquals(fs2FailureIndexName, dataStreamInfos.get(0).getDataStream().getFailureIndices().getIndices().get(0).getName());
397+
assertEquals(fs2FailureIndexName, dataStreamInfos.get(0).getDataStream().getFailureIndices().get(0).getName());
398398
}
399399
}
400400

@@ -587,8 +587,8 @@ public void testSnapshotAndRestoreAll() throws Exception {
587587
assertEquals(otherDsBackingIndexName, dataStreamInfos.get(1).getDataStream().getIndices().get(0).getName());
588588
assertEquals(1, dataStreamInfos.get(2).getDataStream().getIndices().size());
589589
assertEquals(fsBackingIndexName, dataStreamInfos.get(2).getDataStream().getIndices().get(0).getName());
590-
assertEquals(1, dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().size());
591-
assertEquals(fsFailureIndexName, dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().get(0).getName());
590+
assertEquals(1, dataStreamInfos.get(2).getDataStream().getFailureIndices().size());
591+
assertEquals(fsFailureIndexName, dataStreamInfos.get(2).getDataStream().getFailureIndices().get(0).getName());
592592

593593
GetAliasesResponse getAliasesResponse = client.admin()
594594
.indices()
@@ -659,7 +659,7 @@ public void testSnapshotAndRestoreIncludeAliasesFalse() throws Exception {
659659
assertEquals(1, dataStreamInfos.get(2).getDataStream().getIndices().size());
660660
assertEquals(fsBackingIndexName, dataStreamInfos.get(2).getDataStream().getIndices().get(0).getName());
661661
assertEquals(1, dataStreamInfos.get(2).getDataStream().getIndices().size());
662-
assertEquals(fsFailureIndexName, dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().get(0).getName());
662+
assertEquals(fsFailureIndexName, dataStreamInfos.get(2).getDataStream().getFailureIndices().get(0).getName());
663663

664664
GetAliasesResponse getAliasesResponse = client.admin()
665665
.indices()
@@ -1257,8 +1257,8 @@ public void testExcludeDSFromSnapshotWhenExcludingAnyOfItsIndices() {
12571257
assertThat(restoreSnapshotResponse.failedShards(), is(0));
12581258

12591259
GetDataStreamAction.Response.DataStreamInfo dataStream = getDataStreamInfo(dataStreamName).getFirst();
1260-
assertThat(dataStream.getDataStream().getBackingIndices().getIndices(), not(empty()));
1261-
assertThat(dataStream.getDataStream().getFailureIndices().getIndices(), empty());
1260+
assertThat(dataStream.getDataStream().getDataComponent().getIndices(), not(empty()));
1261+
assertThat(dataStream.getDataStream().getFailureIndices(), empty());
12621262
}
12631263
}
12641264

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/LazyRolloverDuringDisruptionIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void testRolloverIsExecutedOnce() throws ExecutionException, InterruptedE
5858
// Verify that the data stream is marked for rollover and that it has currently one index
5959
DataStream dataStream = getDataStream(dataStreamName);
6060
assertThat(dataStream.rolloverOnWrite(), equalTo(true));
61-
assertThat(dataStream.getBackingIndices().getIndices().size(), equalTo(1));
61+
assertThat(dataStream.getDataComponent().getIndices().size(), equalTo(1));
6262

6363
// Introduce a disruption to the master node that should delay the rollover execution
6464
final var barrier = new CyclicBarrier(2);
@@ -107,7 +107,7 @@ public void onFailure(Exception e) {
107107
// Verify that the rollover has happened once
108108
dataStream = getDataStream(dataStreamName);
109109
assertThat(dataStream.rolloverOnWrite(), equalTo(false));
110-
assertThat(dataStream.getBackingIndices().getIndices().size(), equalTo(2));
110+
assertThat(dataStream.getDataComponent().getIndices().size(), equalTo(2));
111111
}
112112

113113
private DataStream getDataStream(String dataStreamName) {

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/LogsDataStreamIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ private void assertDataStreamBackingIndicesModes(final String dataStreamName, fi
285285
final GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
286286
.actionGet();
287287
final DataStream dataStream = getDataStreamResponse.getDataStreams().get(0).getDataStream();
288-
final DataStream.DataStreamIndices backingIndices = dataStream.getBackingIndices();
288+
final DataStream.DataStreamIndices backingIndices = dataStream.getDataComponent();
289289
final Iterator<IndexMode> indexModesIterator = modes.iterator();
290290
assertThat(backingIndices.getIndices().size(), Matchers.equalTo(modes.size()));
291291
for (final Index index : backingIndices.getIndices()) {

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,7 +1084,7 @@ public void testLifecycleAppliedToFailureStore() throws Exception {
10841084
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
10851085
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
10861086
assertThat(backingIndices.size(), equalTo(1));
1087-
List<Index> failureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices().getIndices();
1087+
List<Index> failureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices();
10881088
assertThat(failureIndices.size(), equalTo(2));
10891089
});
10901090

@@ -1129,7 +1129,7 @@ public void testLifecycleAppliedToFailureStore() throws Exception {
11291129
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
11301130
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
11311131
assertThat(backingIndices.size(), equalTo(1));
1132-
List<Index> failureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices().getIndices();
1132+
List<Index> failureIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices();
11331133
assertThat(failureIndices.size(), equalTo(1));
11341134
assertThat(failureIndices.get(0).getName(), equalTo(secondGenerationIndex));
11351135
});
@@ -1156,14 +1156,7 @@ private static List<String> getFailureIndices(String dataStreamName) {
11561156
.actionGet();
11571157
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
11581158
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
1159-
return getDataStreamResponse.getDataStreams()
1160-
.get(0)
1161-
.getDataStream()
1162-
.getFailureIndices()
1163-
.getIndices()
1164-
.stream()
1165-
.map(Index::getName)
1166-
.toList();
1159+
return getDataStreamResponse.getDataStreams().get(0).getDataStream().getFailureIndices().stream().map(Index::getName).toList();
11671160
}
11681161

11691162
static void indexDocs(String dataStream, int numDocs) {

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DataStreamsStatsTransportAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,11 @@ protected DataStreamsStatsAction.DataStreamShardStats readShardResult(StreamInpu
171171
if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) {
172172
DataStream dataStream = (DataStream) indexAbstraction;
173173
AggregatedStats stats = aggregatedDataStreamsStats.computeIfAbsent(dataStream.getName(), s -> new AggregatedStats());
174-
dataStream.getBackingIndices().getIndices().stream().map(Index::getName).forEach(index -> {
174+
dataStream.getIndices().stream().map(Index::getName).forEach(index -> {
175175
stats.backingIndices.add(index);
176176
allBackingIndices.add(index);
177177
});
178-
dataStream.getFailureIndices().getIndices().stream().map(Index::getName).forEach(index -> {
178+
dataStream.getFailureIndices().stream().map(Index::getName).forEach(index -> {
179179
stats.backingIndices.add(index);
180180
allBackingIndices.add(index);
181181
});

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DeleteDataStreamTransportAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ static ClusterState removeDataStream(
156156
DataStream dataStream = currentState.metadata().dataStreams().get(dataStreamName);
157157
assert dataStream != null;
158158
backingIndicesToRemove.addAll(dataStream.getIndices());
159-
backingIndicesToRemove.addAll(dataStream.getFailureIndices().getIndices());
159+
backingIndicesToRemove.addAll(dataStream.getFailureIndices());
160160
}
161161

162162
// first delete the data streams and then the indices:

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,8 @@ static GetDataStreamAction.Response innerOperation(
207207
Map<Index, IndexProperties> backingIndicesSettingsValues = new HashMap<>();
208208
Metadata metadata = state.getMetadata();
209209
collectIndexSettingsValues(dataStream, backingIndicesSettingsValues, metadata, dataStream.getIndices());
210-
if (DataStream.isFailureStoreFeatureFlagEnabled() && dataStream.getFailureIndices().getIndices().isEmpty() == false) {
211-
collectIndexSettingsValues(dataStream, backingIndicesSettingsValues, metadata, dataStream.getFailureIndices().getIndices());
210+
if (DataStream.isFailureStoreFeatureFlagEnabled() && dataStream.getFailureIndices().isEmpty() == false) {
211+
collectIndexSettingsValues(dataStream, backingIndicesSettingsValues, metadata, dataStream.getFailureIndices());
212212
}
213213

214214
GetDataStreamAction.Response.TimeSeries timeSeries = null;

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -762,10 +762,8 @@ static List<Index> getTargetIndices(
762762
targetIndices.add(index);
763763
}
764764
}
765-
if (withFailureStore
766-
&& DataStream.isFailureStoreFeatureFlagEnabled()
767-
&& dataStream.getFailureIndices().getIndices().isEmpty() == false) {
768-
for (Index index : dataStream.getFailureIndices().getIndices()) {
765+
if (withFailureStore && DataStream.isFailureStoreFeatureFlagEnabled() && dataStream.getFailureIndices().isEmpty() == false) {
766+
for (Index index : dataStream.getFailureIndices()) {
769767
if (dataStream.isIndexManagedByDataStreamLifecycle(index, indexMetadataSupplier)
770768
&& indicesToExcludeForRemainingRun.contains(index) == false) {
771769
targetIndices.add(index);
@@ -820,7 +818,7 @@ private Set<Index> maybeExecuteRollover(ClusterState state, DataStream dataStrea
820818

821819
@Nullable
822820
private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) {
823-
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getFailureStoreWriteIndex() : dataStream.getWriteIndex();
821+
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
824822
if (currentRunWriteIndex == null) {
825823
return null;
826824
}

modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void testUpdateTimeSeriesTemporalRange_NoUpdateBecauseReplicated() {
141141
).getMetadata();
142142
DataStream d = metadata.dataStreams().get(dataStreamName);
143143
metadata = Metadata.builder(metadata)
144-
.put(d.copy().setReplicated(true).setBackingIndices(d.getBackingIndices().copy().setRolloverOnWrite(false).build()).build())
144+
.put(d.copy().setReplicated(true).setBackingIndices(d.getDataComponent().copy().setRolloverOnWrite(false).build()).build())
145145
.build();
146146

147147
now = now.plus(1, ChronoUnit.HOURS);

0 commit comments

Comments
 (0)