Skip to content

Commit 88e49f0

Browse files
committed
Introduce new setting for default failure store
1 parent 03d7781 commit 88e49f0

File tree

6 files changed

+205
-16
lines changed

6 files changed

+205
-16
lines changed

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,12 @@ public void testDataStreamRetention() throws Exception {
147147
@SuppressWarnings("unchecked")
148148
public void testDefaultRetention() throws Exception {
149149
// Set default global retention
150-
updateClusterSettings(Settings.builder().put("data_streams.lifecycle.retention.default", "10s").build());
150+
updateClusterSettings(
151+
Settings.builder()
152+
.put("data_streams.lifecycle.retention.default", "10s")
153+
.put("data_streams.lifecycle.retention.failures_default", "10s")
154+
.build()
155+
);
151156

152157
// Verify that the effective retention matches the default retention
153158
{
@@ -163,7 +168,7 @@ public void testDefaultRetention() throws Exception {
163168
assertThat(lifecycle.get("data_retention"), nullValue());
164169
Map<String, Object> failuresLifecycle = ((Map<String, Map<String, Object>>) dataStream.get("failure_store")).get("lifecycle");
165170
assertThat(failuresLifecycle.get("effective_retention"), is("10s"));
166-
assertThat(failuresLifecycle.get("retention_determined_by"), is("default_global_retention"));
171+
assertThat(failuresLifecycle.get("retention_determined_by"), is("default_failures_retention"));
167172
assertThat(failuresLifecycle.get("data_retention"), nullValue());
168173
}
169174

server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
public record DataStreamGlobalRetention(@Nullable TimeValue defaultRetention, @Nullable TimeValue maxRetention) implements Writeable {
2424

2525
public static final TimeValue MIN_RETENTION_VALUE = TimeValue.timeValueSeconds(10);
26+
public static final DataStreamGlobalRetention DEFAULT_FAILURES_CONFIG = new DataStreamGlobalRetention(
27+
DataStreamGlobalRetentionSettings.FAILURES_DEFAULT_RETENTION,
28+
null
29+
);
2630

2731
/**
2832
* @param defaultRetention the default retention or null if it's undefined
@@ -46,6 +50,21 @@ public DataStreamGlobalRetention(TimeValue defaultRetention, TimeValue maxRetent
4650
this.maxRetention = maxRetention;
4751
}
4852

53+
/**
54+
* Helper method that creates a global retention object or returns null in case both retentions are null
55+
*/
56+
@Nullable
57+
public static DataStreamGlobalRetention create(@Nullable TimeValue defaultRetention, @Nullable TimeValue maxRetention) {
58+
if (defaultRetention == null && maxRetention == null) {
59+
return null;
60+
}
61+
if (maxRetention == null
62+
&& DataStreamGlobalRetentionSettings.FAILURES_DEFAULT_RETENTION.getMillis() == defaultRetention.getMillis()) {
63+
return DEFAULT_FAILURES_CONFIG;
64+
}
65+
return new DataStreamGlobalRetention(defaultRetention, maxRetention);
66+
}
67+
4968
private boolean validateRetentionValue(@Nullable TimeValue retention) {
5069
return retention == null || retention.getMillis() >= MIN_RETENTION_VALUE.getMillis();
5170
}

server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettings.java

Lines changed: 94 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.logging.log4j.Logger;
1414
import org.elasticsearch.common.settings.ClusterSettings;
1515
import org.elasticsearch.common.settings.Setting;
16+
import org.elasticsearch.common.settings.Settings;
1617
import org.elasticsearch.core.Nullable;
1718
import org.elasticsearch.core.TimeValue;
1819

@@ -24,8 +25,10 @@
2425
* This class holds the data stream global retention settings. It defines, validates and monitors the settings.
2526
* <p>
2627
* The global retention settings apply to non-system data streams that are managed by the data stream lifecycle. They consist of:
27-
* - The default retention which applies to data streams that do not have a retention defined.
28-
* - The max retention which applies to all data streams that do not have retention or their retention has exceeded this value.
28+
* - The default retention which applies to the backing indices of data streams that do not have a retention defined.
29+
* - The max retention which applies to backing and failure indices of data streams that do not have retention or their
30+
* retention has exceeded this value.
31+
* - The failures default retention which applied to the failure indices of data streams that do not have retention defined.
2932
*/
3033
public class DataStreamGlobalRetentionSettings {
3134

@@ -82,27 +85,66 @@ public Iterator<Setting<?>> settings() {
8285
Setting.Property.Dynamic
8386
);
8487

88+
static final TimeValue FAILURES_DEFAULT_RETENTION = TimeValue.timeValueDays(30);
89+
public static final Setting<TimeValue> FAILURE_STORE_DEFAULT_RETENTION_SETTING = Setting.timeSetting(
90+
"data_streams.lifecycle.retention.failures_default",
91+
FAILURES_DEFAULT_RETENTION,
92+
new Setting.Validator<>() {
93+
@Override
94+
public void validate(TimeValue value) {}
95+
96+
@Override
97+
public void validate(final TimeValue settingValue, final Map<Setting<?>, Object> settings) {
98+
TimeValue defaultRetention = getSettingValueOrNull(settingValue);
99+
// Currently, we do not validate the default for the failure store against the max because
100+
// we start with a default value that might conflict the max retention.
101+
validateIsolatedRetentionValue(defaultRetention, FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey());
102+
}
103+
},
104+
Setting.Property.NodeScope,
105+
Setting.Property.Dynamic
106+
);
107+
85108
@Nullable
86109
private volatile TimeValue defaultRetention;
87110
@Nullable
88111
private volatile TimeValue maxRetention;
112+
@Nullable
113+
private volatile TimeValue failuresDefaultRetention;
114+
/** We cache the global retention objects, volatile is sufficient we only "write" this values in the settings appliers which
115+
* are executed by {@link org.elasticsearch.common.settings.AbstractScopedSettings#applySettings(Settings)} which is synchronised.
116+
*/
117+
@Nullable
118+
private volatile DataStreamGlobalRetention dataGlobalRetention;
119+
@Nullable
120+
private volatile DataStreamGlobalRetention failuresGlobalRetention;
89121

90122
private DataStreamGlobalRetentionSettings() {
91123

92124
}
93125

126+
/**
127+
* @return the max retention that applies to all data stream data
128+
*/
94129
@Nullable
95130
public TimeValue getMaxRetention() {
96131
return maxRetention;
97132
}
98133

134+
/**
135+
* @return the default retention that applies either to the data component
136+
*/
99137
@Nullable
100138
public TimeValue getDefaultRetention() {
101139
return defaultRetention;
102140
}
103141

104-
public boolean areDefined() {
105-
return getDefaultRetention() != null || getMaxRetention() != null;
142+
/**
143+
* @return the default retention that applies either to the data or the failures component
144+
*/
145+
@Nullable
146+
public TimeValue getDefaultRetention(boolean failureStore) {
147+
return failureStore ? failuresDefaultRetention : defaultRetention;
106148
}
107149

108150
/**
@@ -113,17 +155,33 @@ public static DataStreamGlobalRetentionSettings create(ClusterSettings clusterSe
113155
DataStreamGlobalRetentionSettings dataStreamGlobalRetentionSettings = new DataStreamGlobalRetentionSettings();
114156
clusterSettings.initializeAndWatch(DATA_STREAMS_DEFAULT_RETENTION_SETTING, dataStreamGlobalRetentionSettings::setDefaultRetention);
115157
clusterSettings.initializeAndWatch(DATA_STREAMS_MAX_RETENTION_SETTING, dataStreamGlobalRetentionSettings::setMaxRetention);
158+
clusterSettings.initializeAndWatch(
159+
FAILURE_STORE_DEFAULT_RETENTION_SETTING,
160+
dataStreamGlobalRetentionSettings::setFailuresDefaultRetention
161+
);
116162
return dataStreamGlobalRetentionSettings;
117163
}
118164

119165
private void setMaxRetention(TimeValue maxRetention) {
120166
this.maxRetention = getSettingValueOrNull(maxRetention);
121-
logger.info("Updated max factory retention to [{}]", this.maxRetention == null ? null : maxRetention.getStringRep());
167+
this.dataGlobalRetention = createDataStreamGlobalRetention(false);
168+
this.failuresGlobalRetention = createDataStreamGlobalRetention(true);
169+
logger.info("Updated global max retention to [{}]", this.maxRetention == null ? null : maxRetention.getStringRep());
122170
}
123171

124172
private void setDefaultRetention(TimeValue defaultRetention) {
125173
this.defaultRetention = getSettingValueOrNull(defaultRetention);
126-
logger.info("Updated default factory retention to [{}]", this.defaultRetention == null ? null : defaultRetention.getStringRep());
174+
this.dataGlobalRetention = createDataStreamGlobalRetention(false);
175+
logger.info("Updated global default retention to [{}]", this.defaultRetention == null ? null : defaultRetention.getStringRep());
176+
}
177+
178+
private void setFailuresDefaultRetention(TimeValue failuresDefaultRetention) {
179+
this.failuresDefaultRetention = getSettingValueOrNull(failuresDefaultRetention);
180+
this.failuresGlobalRetention = createDataStreamGlobalRetention(true);
181+
logger.info(
182+
"Updated failures default retention to [{}]",
183+
this.failuresDefaultRetention == null ? null : failuresDefaultRetention.getStringRep()
184+
);
127185
}
128186

129187
private static void validateIsolatedRetentionValue(@Nullable TimeValue retention, String settingName) {
@@ -150,12 +208,36 @@ private static void validateGlobalRetentionConfiguration(@Nullable TimeValue def
150208
}
151209
}
152210

211+
/**
212+
* @return the global retention of backing indices
213+
*/
153214
@Nullable
154215
public DataStreamGlobalRetention get() {
155-
if (areDefined() == false) {
216+
return get(false);
217+
}
218+
219+
/**
220+
* Returns the global retention that applies to the data or failures of a data stream
221+
* @param failureStore, true if we are retrieving the global retention that applies to failure store, false otherwise.
222+
*/
223+
@Nullable
224+
public DataStreamGlobalRetention get(boolean failureStore) {
225+
return failureStore ? failuresGlobalRetention : dataGlobalRetention;
226+
}
227+
228+
@Nullable
229+
private DataStreamGlobalRetention createDataStreamGlobalRetention(boolean failureStore) {
230+
if (areDefined(failureStore) == false) {
156231
return null;
157232
}
158-
return new DataStreamGlobalRetention(getDefaultRetention(), getMaxRetention());
233+
TimeValue defaultRetention = getDefaultRetention(failureStore);
234+
TimeValue maxRetention = getMaxRetention();
235+
// We ensure that we create valid DataStreamGlobalRetention where default is less or equal to max.
236+
// If it's not we set it to null.
237+
if (defaultRetention != null && maxRetention != null && defaultRetention.getMillis() > maxRetention.getMillis()) {
238+
return new DataStreamGlobalRetention(null, getMaxRetention());
239+
}
240+
return new DataStreamGlobalRetention(defaultRetention, maxRetention);
159241
}
160242

161243
/**
@@ -169,4 +251,8 @@ public DataStreamGlobalRetention get() {
169251
private static TimeValue getSettingValueOrNull(TimeValue value) {
170252
return value == null || value.equals(TimeValue.MINUS_ONE) ? null : value;
171253
}
254+
255+
private boolean areDefined(boolean failureStore) {
256+
return getDefaultRetention(failureStore) != null || getMaxRetention() != null;
257+
}
172258
}

server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,10 @@ public Tuple<TimeValue, RetentionSource> getEffectiveDataRetentionWithSource(
233233
}
234234
if (dataRetention() == null) {
235235
return globalRetention.defaultRetention() != null
236-
? Tuple.tuple(globalRetention.defaultRetention(), RetentionSource.DEFAULT_GLOBAL_RETENTION)
236+
? Tuple.tuple(
237+
globalRetention.defaultRetention(),
238+
targetsFailureStore() ? RetentionSource.DEFAULT_FAILURES_RETENTION : RetentionSource.DEFAULT_GLOBAL_RETENTION
239+
)
237240
: Tuple.tuple(globalRetention.maxRetention(), RetentionSource.MAX_GLOBAL_RETENTION);
238241
}
239242
if (globalRetention.maxRetention() != null && globalRetention.maxRetention().getMillis() < dataRetention().getMillis()) {
@@ -510,7 +513,8 @@ public static ToXContent.Params addEffectiveRetentionParams(ToXContent.Params pa
510513
public enum RetentionSource {
511514
DATA_STREAM_CONFIGURATION,
512515
DEFAULT_GLOBAL_RETENTION,
513-
MAX_GLOBAL_RETENTION;
516+
MAX_GLOBAL_RETENTION,
517+
DEFAULT_FAILURES_RETENTION;
514518

515519
public String displayName() {
516520
return this.toString().toLowerCase(Locale.ROOT);

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,7 @@ public void apply(Settings value, Settings current, Settings previous) {
631631
TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE,
632632
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
633633
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
634+
DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING,
634635
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
635636
DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING,
636637
IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING,

server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettingsTests.java

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public void testDefaults() {
2727

2828
assertThat(globalRetentionSettings.getDefaultRetention(), nullValue());
2929
assertThat(globalRetentionSettings.getMaxRetention(), nullValue());
30+
assertThat(globalRetentionSettings.get(false), nullValue());
31+
assertThat(globalRetentionSettings.get(true), equalTo(DataStreamGlobalRetention.create(TimeValue.timeValueDays(30), null)));
3032
}
3133

3234
public void testMonitorsDefaultRetention() {
@@ -43,7 +45,8 @@ public void testMonitorsDefaultRetention() {
4345
.build();
4446
clusterSettings.applySettings(newSettings);
4547

46-
assertThat(newDefaultRetention, equalTo(globalRetentionSettings.getDefaultRetention()));
48+
assertThat(globalRetentionSettings.getDefaultRetention(), equalTo(newDefaultRetention));
49+
assertThat(globalRetentionSettings.get(false), equalTo(DataStreamGlobalRetention.create(newDefaultRetention, null)));
4750

4851
// Test invalid update
4952
Settings newInvalidSettings = Settings.builder()
@@ -57,20 +60,33 @@ public void testMonitorsDefaultRetention() {
5760
exception.getCause().getMessage(),
5861
containsString("Setting 'data_streams.lifecycle.retention.default' should be greater than")
5962
);
63+
assertThat(globalRetentionSettings.get(false), equalTo(DataStreamGlobalRetention.create(newDefaultRetention, null)));
6064
}
6165

6266
public void testMonitorsMaxRetention() {
6367
ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings();
6468
DataStreamGlobalRetentionSettings globalRetentionSettings = DataStreamGlobalRetentionSettings.create(clusterSettings);
6569

6670
// Test valid update
67-
TimeValue newMaxRetention = TimeValue.timeValueDays(randomIntBetween(10, 30));
71+
TimeValue newMaxRetention = TimeValue.timeValueDays(randomIntBetween(10, 29));
6872
Settings newSettings = Settings.builder()
6973
.put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), newMaxRetention.toHumanReadableString(0))
7074
.build();
7175
clusterSettings.applySettings(newSettings);
7276

73-
assertThat(newMaxRetention, equalTo(globalRetentionSettings.getMaxRetention()));
77+
assertThat(globalRetentionSettings.getMaxRetention(), equalTo(newMaxRetention));
78+
assertThat(globalRetentionSettings.get(false), equalTo(DataStreamGlobalRetention.create(null, newMaxRetention)));
79+
assertThat(globalRetentionSettings.get(true), equalTo(DataStreamGlobalRetention.create(null, newMaxRetention)));
80+
81+
newMaxRetention = TimeValue.timeValueDays(100);
82+
newSettings = Settings.builder()
83+
.put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), newMaxRetention.toHumanReadableString(0))
84+
.build();
85+
clusterSettings.applySettings(newSettings);
86+
assertThat(
87+
globalRetentionSettings.get(true),
88+
equalTo(DataStreamGlobalRetention.create(TimeValue.timeValueDays(30), newMaxRetention))
89+
);
7490

7591
// Test invalid update
7692
Settings newInvalidSettings = Settings.builder()
@@ -84,11 +100,57 @@ public void testMonitorsMaxRetention() {
84100
exception.getCause().getMessage(),
85101
containsString("Setting 'data_streams.lifecycle.retention.max' should be greater than")
86102
);
103+
assertThat(globalRetentionSettings.get(false), equalTo(DataStreamGlobalRetention.create(null, newMaxRetention)));
104+
}
105+
106+
public void testMonitorsDefaultFailuresRetention() {
107+
ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings();
108+
DataStreamGlobalRetentionSettings globalRetentionSettings = DataStreamGlobalRetentionSettings.create(clusterSettings);
109+
110+
// Test valid update
111+
TimeValue newDefaultRetention = TimeValue.timeValueDays(randomIntBetween(1, 10));
112+
Settings newSettings = Settings.builder()
113+
.put(
114+
DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey(),
115+
newDefaultRetention.toHumanReadableString(0)
116+
)
117+
.build();
118+
clusterSettings.applySettings(newSettings);
119+
120+
assertThat(globalRetentionSettings.getDefaultRetention(true), equalTo(newDefaultRetention));
121+
assertThat(globalRetentionSettings.get(true), equalTo(DataStreamGlobalRetention.create(newDefaultRetention, null)));
122+
123+
// Test update default failures retention to infinite retention
124+
newDefaultRetention = TimeValue.MINUS_ONE;
125+
newSettings = Settings.builder()
126+
.put(
127+
DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey(),
128+
newDefaultRetention.toHumanReadableString(0)
129+
)
130+
.build();
131+
clusterSettings.applySettings(newSettings);
132+
133+
assertThat(globalRetentionSettings.getDefaultRetention(true), nullValue());
134+
assertThat(globalRetentionSettings.get(true), nullValue());
135+
136+
// Test invalid update
137+
Settings newInvalidSettings = Settings.builder()
138+
.put(DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey(), TimeValue.ZERO)
139+
.build();
140+
IllegalArgumentException exception = expectThrows(
141+
IllegalArgumentException.class,
142+
() -> clusterSettings.applySettings(newInvalidSettings)
143+
);
144+
assertThat(
145+
exception.getCause().getMessage(),
146+
containsString("Setting 'data_streams.lifecycle.retention.failures_default' should be greater than")
147+
);
148+
assertThat(globalRetentionSettings.get(true), nullValue());
87149
}
88150

89151
public void testCombinationValidation() {
90152
ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings();
91-
DataStreamGlobalRetentionSettings.create(clusterSettings);
153+
DataStreamGlobalRetentionSettings dataStreamGlobalRetentionSettings = DataStreamGlobalRetentionSettings.create(clusterSettings);
92154

93155
// Test invalid update
94156
Settings newInvalidSettings = Settings.builder()
@@ -105,5 +167,17 @@ public void testCombinationValidation() {
105167
"Setting [data_streams.lifecycle.retention.default=90d] cannot be greater than [data_streams.lifecycle.retention.max=30d]"
106168
)
107169
);
170+
171+
// Test valid update even if the failures default is greater than max.
172+
Settings newValidSettings = Settings.builder()
173+
.put(DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey(), TimeValue.timeValueDays(90))
174+
.put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), TimeValue.timeValueDays(30))
175+
.build();
176+
clusterSettings.applySettings(newValidSettings);
177+
assertThat(dataStreamGlobalRetentionSettings.getDefaultRetention(true), equalTo(TimeValue.timeValueDays(90)));
178+
assertThat(
179+
dataStreamGlobalRetentionSettings.getDefaultRetention(true),
180+
equalTo(DataStreamGlobalRetention.create(null, TimeValue.timeValueDays(30)))
181+
);
108182
}
109183
}

0 commit comments

Comments
 (0)