Skip to content

Commit 10ac063

Browse files
authored
[DSL Global Retention] Calculate and use global retention in DSL (#106268)
1 parent 2a2e648 commit 10ac063

File tree

7 files changed

+235
-47
lines changed

7 files changed

+235
-47
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.cluster.SimpleBatchedExecutor;
4343
import org.elasticsearch.cluster.block.ClusterBlockLevel;
4444
import org.elasticsearch.cluster.metadata.DataStream;
45+
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
4546
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
4647
import org.elasticsearch.cluster.metadata.IndexAbstraction;
4748
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -796,7 +797,7 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) {
796797
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
797798
rolloverConfiguration,
798799
dataStream.getName(),
799-
dataStream.getLifecycle().getEffectiveDataRetention()
800+
dataStream.getLifecycle().getEffectiveDataRetention(DataStreamGlobalRetention.getFromClusterState(state))
800801
);
801802
transportActionsDeduplicator.executeOnce(
802803
rolloverRequest,
@@ -823,14 +824,15 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) {
823824
*/
824825
private Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
825826
Metadata metadata = state.metadata();
826-
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier);
827+
DataStreamGlobalRetention globalRetention = DataStreamGlobalRetention.getFromClusterState(state);
828+
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier, globalRetention);
827829
if (backingIndicesOlderThanRetention.isEmpty()) {
828830
return Set.of();
829831
}
830832
Set<Index> indicesToBeRemoved = new HashSet<>();
831833
// We know that there is lifecycle and retention because there are indices to be deleted
832834
assert dataStream.getLifecycle() != null;
833-
TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention();
835+
TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention(globalRetention);
834836
for (Index index : backingIndicesOlderThanRetention) {
835837
if (indicesToExcludeForRemainingRun.contains(index) == false) {
836838
IndexMetadata backingIndex = metadata.index(index);

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -759,13 +759,17 @@ public DataStream snapshot(Collection<String> indicesInSnapshot) {
759759
* NOTE that this specifically does not return the write index of the data stream as usually retention
760760
* is treated differently for the write index (i.e. they first need to be rolled over)
761761
*/
762-
public List<Index> getIndicesPastRetention(Function<String, IndexMetadata> indexMetadataSupplier, LongSupplier nowSupplier) {
763-
if (lifecycle == null || lifecycle.isEnabled() == false || lifecycle.getEffectiveDataRetention() == null) {
762+
public List<Index> getIndicesPastRetention(
763+
Function<String, IndexMetadata> indexMetadataSupplier,
764+
LongSupplier nowSupplier,
765+
DataStreamGlobalRetention globalRetention
766+
) {
767+
if (lifecycle == null || lifecycle.isEnabled() == false || lifecycle.getEffectiveDataRetention(globalRetention) == null) {
764768
return List.of();
765769
}
766770

767771
List<Index> indicesPastRetention = getNonWriteIndicesOlderThan(
768-
lifecycle.getEffectiveDataRetention(),
772+
lifecycle.getEffectiveDataRetention(globalRetention),
769773
indexMetadataSupplier,
770774
this::isIndexManagedByDataStreamLifecycle,
771775
nowSupplier
@@ -1098,14 +1102,18 @@ public static DataStream fromXContent(XContentParser parser) throws IOException
10981102

10991103
@Override
11001104
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
1101-
return toXContent(builder, params, null);
1105+
return toXContent(builder, params, null, null);
11021106
}
11031107

11041108
/**
11051109
* Converts the data stream to XContent and passes the RolloverConditions, when provided, to the lifecycle.
11061110
*/
1107-
public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nullable RolloverConfiguration rolloverConfiguration)
1108-
throws IOException {
1111+
public XContentBuilder toXContent(
1112+
XContentBuilder builder,
1113+
Params params,
1114+
@Nullable RolloverConfiguration rolloverConfiguration,
1115+
@Nullable DataStreamGlobalRetention globalRetention
1116+
) throws IOException {
11091117
builder.startObject();
11101118
builder.field(NAME_FIELD.getPreferredName(), name);
11111119
builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName())
@@ -1132,7 +1140,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nulla
11321140
}
11331141
if (lifecycle != null) {
11341142
builder.field(LIFECYCLE.getPreferredName());
1135-
lifecycle.toXContent(builder, params, rolloverConfiguration);
1143+
lifecycle.toXContent(builder, params, rolloverConfiguration, globalRetention);
11361144
}
11371145
builder.field(ROLLOVER_ON_WRITE_FIELD.getPreferredName(), rolloverOnWrite);
11381146
if (autoShardingEvent != null) {

server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.settings.Settings;
2323
import org.elasticsearch.core.Nullable;
2424
import org.elasticsearch.core.TimeValue;
25+
import org.elasticsearch.core.Tuple;
2526
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
2627
import org.elasticsearch.xcontent.AbstractObjectParser;
2728
import org.elasticsearch.xcontent.ConstructingObjectParser;
@@ -34,6 +35,7 @@
3435

3536
import java.io.IOException;
3637
import java.util.List;
38+
import java.util.Locale;
3739
import java.util.Objects;
3840

3941
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
@@ -131,15 +133,52 @@ public boolean isEnabled() {
131133
/**
132134
* The least amount of time data should be kept by elasticsearch.
133135
* @return the time period or null, null represents that data should never be deleted.
136+
* @deprecated use {@link #getEffectiveDataRetention(DataStreamGlobalRetention)}
134137
*/
138+
@Deprecated
135139
@Nullable
136140
public TimeValue getEffectiveDataRetention() {
137-
return getDataStreamRetention();
141+
return getEffectiveDataRetention(null);
142+
}
143+
144+
/**
145+
* The least amount of time data should be kept by elasticsearch.
146+
* @return the time period or null, null represents that data should never be deleted.
147+
*/
148+
@Nullable
149+
public TimeValue getEffectiveDataRetention(@Nullable DataStreamGlobalRetention globalRetention) {
150+
return getEffectiveDataRetentionWithSource(globalRetention).v1();
151+
}
152+
153+
/**
154+
* The least amount of time data should be kept by elasticsearch.
155+
* @return the time period or null, null represents that data should never be deleted.
156+
*/
157+
@Nullable
158+
public Tuple<TimeValue, RetentionSource> getEffectiveDataRetentionWithSource(@Nullable DataStreamGlobalRetention globalRetention) {
159+
// If lifecycle is disabled there is no effective retention
160+
if (enabled == false) {
161+
return Tuple.tuple(null, RetentionSource.DATA_STREAM_CONFIGURATION);
162+
}
163+
var dataStreamRetention = getDataStreamRetention();
164+
if (globalRetention == null) {
165+
return Tuple.tuple(dataStreamRetention, RetentionSource.DATA_STREAM_CONFIGURATION);
166+
}
167+
if (dataStreamRetention == null) {
168+
return globalRetention.getDefaultRetention() != null
169+
? Tuple.tuple(globalRetention.getDefaultRetention(), RetentionSource.DEFAULT_GLOBAL_RETENTION)
170+
: Tuple.tuple(globalRetention.getMaxRetention(), RetentionSource.MAX_GLOBAL_RETENTION);
171+
}
172+
if (globalRetention.getMaxRetention() != null && globalRetention.getMaxRetention().getMillis() < dataStreamRetention.getMillis()) {
173+
return Tuple.tuple(globalRetention.getMaxRetention(), RetentionSource.MAX_GLOBAL_RETENTION);
174+
} else {
175+
return Tuple.tuple(dataStreamRetention, RetentionSource.DATA_STREAM_CONFIGURATION);
176+
}
138177
}
139178

140179
/**
141180
* The least amount of time data the data stream is requesting es to keep the data.
142-
* NOTE: this can be overriden by the {@link DataStreamLifecycle#getEffectiveDataRetention()}.
181+
* NOTE: this can be overridden by the {@link DataStreamLifecycle#getEffectiveDataRetention(DataStreamGlobalRetention)}.
143182
* @return the time period or null, null represents that data should never be deleted.
144183
*/
145184
@Nullable
@@ -232,14 +271,28 @@ public String toString() {
232271

233272
@Override
234273
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
235-
return toXContent(builder, params, null);
274+
return toXContent(builder, params, null, null);
236275
}
237276

238277
/**
239278
* Converts the data stream lifecycle to XContent and injects the RolloverConditions if they exist.
279+
* @deprecated use {@link #toXContent(XContentBuilder, Params, RolloverConfiguration, DataStreamGlobalRetention)}
240280
*/
281+
@Deprecated
241282
public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nullable RolloverConfiguration rolloverConfiguration)
242283
throws IOException {
284+
return toXContent(builder, params, rolloverConfiguration, null);
285+
}
286+
287+
/**
288+
* Converts the data stream lifecycle to XContent and injects the RolloverConditions and the global retention if they exist.
289+
*/
290+
public XContentBuilder toXContent(
291+
XContentBuilder builder,
292+
Params params,
293+
@Nullable RolloverConfiguration rolloverConfiguration,
294+
@Nullable DataStreamGlobalRetention globalRetention
295+
) throws IOException {
243296
builder.startObject();
244297
builder.field(ENABLED_FIELD.getPreferredName(), enabled);
245298
if (dataRetention != null) {
@@ -255,7 +308,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nulla
255308
}
256309
if (rolloverConfiguration != null) {
257310
builder.field(ROLLOVER_FIELD.getPreferredName());
258-
rolloverConfiguration.evaluateAndConvertToXContent(builder, params, getEffectiveDataRetention());
311+
rolloverConfiguration.evaluateAndConvertToXContent(builder, params, getEffectiveDataRetention(globalRetention));
259312
}
260313
builder.endObject();
261314
return builder;
@@ -466,4 +519,17 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
466519
return builder;
467520
}
468521
}
522+
523+
/**
524+
* This enum represents all configuration sources that can influence the retention of a data stream.
525+
*/
526+
public enum RetentionSource {
527+
DATA_STREAM_CONFIGURATION,
528+
DEFAULT_GLOBAL_RETENTION,
529+
MAX_GLOBAL_RETENTION;
530+
531+
public String displayName() {
532+
return this.toString().toLowerCase(Locale.ROOT);
533+
}
534+
}
469535
}

server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.unit.ByteSizeValue;
2020
import org.elasticsearch.core.Nullable;
2121
import org.elasticsearch.core.TimeValue;
22+
import org.elasticsearch.core.Tuple;
2223
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
2324
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
2425
import org.elasticsearch.test.ESTestCase;
@@ -33,6 +34,9 @@
3334
import java.util.Set;
3435
import java.util.stream.Stream;
3536

37+
import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DATA_STREAM_CONFIGURATION;
38+
import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DEFAULT_GLOBAL_RETENTION;
39+
import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.MAX_GLOBAL_RETENTION;
3640
import static org.hamcrest.Matchers.containsString;
3741
import static org.hamcrest.Matchers.equalTo;
3842
import static org.hamcrest.Matchers.nullValue;
@@ -107,10 +111,11 @@ public void testXContentSerializationWithRollover() throws IOException {
107111
try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
108112
builder.humanReadable(true);
109113
RolloverConfiguration rolloverConfiguration = RolloverConfigurationTests.randomRolloverConditions();
110-
lifecycle.toXContent(builder, ToXContent.EMPTY_PARAMS, rolloverConfiguration);
114+
DataStreamGlobalRetention globalRetention = DataStreamGlobalRetentionSerializationTests.randomGlobalRetention();
115+
lifecycle.toXContent(builder, ToXContent.EMPTY_PARAMS, rolloverConfiguration, globalRetention);
111116
String serialized = Strings.toString(builder);
112117
assertThat(serialized, containsString("rollover"));
113-
for (String label : rolloverConfiguration.resolveRolloverConditions(lifecycle.getEffectiveDataRetention())
118+
for (String label : rolloverConfiguration.resolveRolloverConditions(lifecycle.getEffectiveDataRetention(globalRetention))
114119
.getConditions()
115120
.keySet()) {
116121
assertThat(serialized, containsString(label));
@@ -253,6 +258,72 @@ public void testInvalidDownsamplingConfiguration() {
253258
}
254259
}
255260

261+
public void testEffectiveRetention() {
262+
// No retention in the data stream lifecycle
263+
{
264+
DataStreamLifecycle noRetentionLifecycle = DataStreamLifecycle.newBuilder().downsampling(randomDownsampling()).build();
265+
TimeValue maxRetention = TimeValue.timeValueDays(randomIntBetween(50, 100));
266+
TimeValue defaultRetention = TimeValue.timeValueDays(randomIntBetween(1, 50));
267+
Tuple<TimeValue, DataStreamLifecycle.RetentionSource> effectiveDataRetentionWithSource = noRetentionLifecycle
268+
.getEffectiveDataRetentionWithSource(null);
269+
assertThat(effectiveDataRetentionWithSource.v1(), nullValue());
270+
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION));
271+
272+
effectiveDataRetentionWithSource = noRetentionLifecycle.getEffectiveDataRetentionWithSource(
273+
new DataStreamGlobalRetention(null, maxRetention)
274+
);
275+
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(maxRetention));
276+
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(MAX_GLOBAL_RETENTION));
277+
278+
effectiveDataRetentionWithSource = noRetentionLifecycle.getEffectiveDataRetentionWithSource(
279+
new DataStreamGlobalRetention(defaultRetention, null)
280+
);
281+
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(defaultRetention));
282+
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DEFAULT_GLOBAL_RETENTION));
283+
284+
effectiveDataRetentionWithSource = noRetentionLifecycle.getEffectiveDataRetentionWithSource(
285+
new DataStreamGlobalRetention(defaultRetention, maxRetention)
286+
);
287+
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(defaultRetention));
288+
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DEFAULT_GLOBAL_RETENTION));
289+
}
290+
291+
// With retention in the data stream lifecycle
292+
{
293+
TimeValue dataStreamRetention = TimeValue.timeValueDays(randomIntBetween(5, 100));
294+
DataStreamLifecycle lifecycleRetention = DataStreamLifecycle.newBuilder()
295+
.dataRetention(dataStreamRetention)
296+
.downsampling(randomDownsampling())
297+
.build();
298+
TimeValue defaultRetention = TimeValue.timeValueDays(randomIntBetween(1, (int) dataStreamRetention.getDays() - 1));
299+
300+
Tuple<TimeValue, DataStreamLifecycle.RetentionSource> effectiveDataRetentionWithSource = lifecycleRetention
301+
.getEffectiveDataRetentionWithSource(null);
302+
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(dataStreamRetention));
303+
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION));
304+
305+
effectiveDataRetentionWithSource = lifecycleRetention.getEffectiveDataRetentionWithSource(
306+
new DataStreamGlobalRetention(defaultRetention, null)
307+
);
308+
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(dataStreamRetention));
309+
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION));
310+
311+
TimeValue maxGlobalRetention = randomBoolean() ? dataStreamRetention : TimeValue.timeValueDays(dataStreamRetention.days() + 1);
312+
effectiveDataRetentionWithSource = lifecycleRetention.getEffectiveDataRetentionWithSource(
313+
new DataStreamGlobalRetention(defaultRetention, maxGlobalRetention)
314+
);
315+
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(dataStreamRetention));
316+
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION));
317+
318+
TimeValue maxRetentionLessThanDataStream = TimeValue.timeValueDays(dataStreamRetention.days() - 1);
319+
effectiveDataRetentionWithSource = lifecycleRetention.getEffectiveDataRetentionWithSource(
320+
new DataStreamGlobalRetention(randomBoolean() ? null : TimeValue.timeValueDays(10), maxRetentionLessThanDataStream)
321+
);
322+
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(maxRetentionLessThanDataStream));
323+
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(MAX_GLOBAL_RETENTION));
324+
}
325+
}
326+
256327
@Nullable
257328
public static DataStreamLifecycle randomLifecycle() {
258329
return DataStreamLifecycle.newBuilder()

0 commit comments

Comments
 (0)