Skip to content

Commit 390818d

Browse files
committed
Account for replicated data streams more completely.
1 parent e2025f6 commit 390818d

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,12 @@ public DataStream(
214214
lifecycle,
215215
dataStreamOptions,
216216
new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent),
217-
new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), failureIndices.isEmpty(), null)
217+
new DataStreamIndices(
218+
FAILURE_STORE_PREFIX,
219+
List.copyOf(failureIndices),
220+
(replicated == false && failureIndices.isEmpty()),
221+
null
222+
)
218223
);
219224
}
220225

@@ -283,15 +288,15 @@ public static DataStream read(StreamInput in) throws IOException {
283288
backingIndicesBuilder.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
284289
}
285290
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
286-
// Read the rollover on write flag from the stream, but force it on if the failure indices are empty
287-
boolean failureStoreRolloverOnWrite = in.readBoolean();
288-
failureStoreRolloverOnWrite |= failureIndices.isEmpty();
291+
// Read the rollover on write flag from the stream, but force it on if the failure indices are empty and we're not replicating
292+
boolean failureStoreRolloverOnWrite = in.readBoolean() || (replicated == false && failureIndices.isEmpty());
289293
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite)
290294
.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
291295
} else {
292296
// If we are reading from an older version that does not have these fields, just default
293297
// to a reasonable value for rollover on write for the failure store
294-
failureIndicesBuilder.setRolloverOnWrite(failureIndices.isEmpty());
298+
boolean failureStoreRolloverOnWrite = replicated == false && failureIndices.isEmpty();
299+
failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite);
295300
}
296301
DataStreamOptions dataStreamOptions;
297302
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
@@ -1497,9 +1502,11 @@ public void writeTo(StreamOutput out) throws IOException {
14971502
new DataStreamIndices(
14981503
FAILURE_STORE_PREFIX,
14991504
args[13] != null ? (List<Index>) args[13] : List.of(),
1500-
// We check if the list of failure indices are empty first, forcing rollover on write to true, and if they have entries,
1501-
// then we use whatever value was previously present.
1502-
(args[13] != null && ((List<Index>) args[13]).isEmpty()) || (args[14] != null && (boolean) args[14]),
1505+
// If replicated (args[5]) is null or exists and is false, and the failure index list (args[13]) is null or
1506+
// exists and is empty, then force the rollover on write field to true. If none of those conditions are met,
1507+
// then use the rollover on write value (args[14]) present in the parser.
1508+
((args[5] == null || ((boolean) args[5] == false)) && (args[13] == null || ((List<Index>) args[13]).isEmpty()))
1509+
|| (args[14] != null && (boolean) args[14]),
15031510
(DataStreamAutoShardingEvent) args[15]
15041511
)
15051512
)

test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ public static DataStream randomInstance(String dataStreamName, LongSupplier time
391391
)
392392
.build(),
393393
DataStream.DataStreamIndices.failureIndicesBuilder(failureIndices)
394-
.setRolloverOnWrite(failureStore && replicated == false && randomBoolean())
394+
.setRolloverOnWrite(replicated == false && (failureIndices.isEmpty() || randomBoolean()))
395395
.setAutoShardingEvent(
396396
failureStore && randomBoolean()
397397
? new DataStreamAutoShardingEvent(

0 commit comments

Comments
 (0)